2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/experimental/fibers/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
48 Future<T>::Future(T2&& val) : core_(nullptr) {
50 p.setValue(std::forward<T2>(val));
51 *this = p.getFuture();
56 typename std::enable_if<
57 folly::is_void_or_unit<T2>::value,
59 Future<T>::Future() : core_(nullptr) {
62 *this = p.getFuture();
67 Future<T>::~Future() {
72 void Future<T>::detach() {
74 core_->detachFuture();
80 void Future<T>::throwIfInvalid() const {
87 void Future<T>::setCallback_(F&& func) {
89 core_->setCallback(std::move(func));
96 typename std::enable_if<isFuture<F>::value,
97 Future<typename isFuture<T>::Inner>>::type
99 return then([](Future<typename isFuture<T>::Inner> internal_future) {
100 return internal_future;
106 // Variant: returns a value
107 // e.g. f.then([](Try<T>&& t){ return t.value(); });
109 template <typename F, typename R, bool isTry, typename... Args>
110 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
111 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
112 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
113 typedef typename R::ReturnsFuture::Inner B;
117 // wrap these so we can move them into the lambda
118 folly::MoveWrapper<Promise<B>> p;
119 folly::MoveWrapper<F> funcm(std::forward<F>(func));
121 // grab the Future now before we lose our handle on the Promise
122 auto f = p->getFuture();
124 f.setExecutor(getExecutor());
127 /* This is a bit tricky.
129 We can't just close over *this in case this Future gets moved. So we
130 make a new dummy Future. We could figure out something more
131 sophisticated that avoids making a new Future object when it can, as an
132 optimization. But this is correct.
134 core_ can't be moved, it is explicitly disallowed (as is copying). But
135 if there's ever a reason to allow it, this is one place that makes that
136 assumption and would need to be fixed. We use a standard shared pointer
137 for core_ (by copying it in), which means in essence obj holds a shared
138 pointer to itself. But this shouldn't leak because Promise will not
139 outlive the continuation, because Promise will setException() with a
140 broken Promise if it is destructed before completed. We could use a
141 weak pointer but it would have to be converted to a shared pointer when
142 func is executed (because the Future returned by func may possibly
143 persist beyond the callback, if it gets moved), and so it is an
144 optimization to just make it shared from the get-go.
146 We have to move in the Promise and func using the MoveWrapper
147 hack. (func could be copied but it's a big drag on perf).
149 Two subtle but important points about this design. detail::Core has no
150 back pointers to Future or Promise, so if Future or Promise get moved
151 (and they will be moved in performant code) we don't have to do
152 anything fancy. And because we store the continuation in the
153 detail::Core, not in the Future, we can execute the continuation even
154 after the Future has gone out of scope. This is an intentional design
155 decision. It is likely we will want to be able to cancel a continuation
156 in some circumstances, but I think it should be explicit not implicit
157 in the destruction of the Future used to create it.
160 [p, funcm](Try<T>&& t) mutable {
161 if (!isTry && t.hasException()) {
162 p->setException(std::move(t.exception()));
165 return (*funcm)(t.template get<isTry, Args>()...);
173 // Variant: returns a Future
174 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
176 template <typename F, typename R, bool isTry, typename... Args>
177 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
178 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
179 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
180 typedef typename R::ReturnsFuture::Inner B;
184 // wrap these so we can move them into the lambda
185 folly::MoveWrapper<Promise<B>> p;
186 folly::MoveWrapper<F> funcm(std::forward<F>(func));
188 // grab the Future now before we lose our handle on the Promise
189 auto f = p->getFuture();
191 f.setExecutor(getExecutor());
195 [p, funcm](Try<T>&& t) mutable {
196 if (!isTry && t.hasException()) {
197 p->setException(std::move(t.exception()));
200 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
201 // that didn't throw, now we can steal p
202 f2.setCallback_([p](Try<B>&& b) mutable {
203 p->setTry(std::move(b));
205 } catch (const std::exception& e) {
206 p->setException(exception_wrapper(std::current_exception(), e));
208 p->setException(exception_wrapper(std::current_exception()));
216 template <typename T>
217 template <typename R, typename Caller, typename... Args>
218 Future<typename isFuture<R>::Inner>
219 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
220 typedef typename std::remove_cv<
221 typename std::remove_reference<
222 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
223 return then([instance, func](Try<T>&& t){
224 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
231 template <class... Args>
232 auto Future<T>::then(Executor* x, Args&&... args)
233 -> decltype(this->then(std::forward<Args>(args)...))
235 auto oldX = getExecutor();
237 return this->then(std::forward<Args>(args)...).via(oldX);
242 Future<void> Future<T>::then() {
243 return then([] (Try<T>&& t) {});
246 // onError where the callback returns T
249 typename std::enable_if<
250 !detail::callableWith<F, exception_wrapper>::value &&
251 !detail::Extract<F>::ReturnsFuture::value,
253 Future<T>::onError(F&& func) {
254 typedef typename detail::Extract<F>::FirstArg Exn;
256 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
257 "Return type of onError callback must be T or Future<T>");
260 auto f = p.getFuture();
261 auto pm = folly::makeMoveWrapper(std::move(p));
262 auto funcm = folly::makeMoveWrapper(std::move(func));
263 setCallback_([pm, funcm](Try<T>&& t) mutable {
264 if (!t.template withException<Exn>([&] (Exn& e) {
269 pm->setTry(std::move(t));
276 // onError where the callback returns Future<T>
279 typename std::enable_if<
280 !detail::callableWith<F, exception_wrapper>::value &&
281 detail::Extract<F>::ReturnsFuture::value,
283 Future<T>::onError(F&& func) {
285 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
286 "Return type of onError callback must be T or Future<T>");
287 typedef typename detail::Extract<F>::FirstArg Exn;
290 auto f = p.getFuture();
291 auto pm = folly::makeMoveWrapper(std::move(p));
292 auto funcm = folly::makeMoveWrapper(std::move(func));
293 setCallback_([pm, funcm](Try<T>&& t) mutable {
294 if (!t.template withException<Exn>([&] (Exn& e) {
296 auto f2 = (*funcm)(e);
297 f2.setCallback_([pm](Try<T>&& t2) mutable {
298 pm->setTry(std::move(t2));
300 } catch (const std::exception& e2) {
301 pm->setException(exception_wrapper(std::current_exception(), e2));
303 pm->setException(exception_wrapper(std::current_exception()));
306 pm->setTry(std::move(t));
315 Future<T> Future<T>::ensure(F func) {
316 MoveWrapper<F> funcw(std::move(func));
317 return this->then([funcw](Try<T>&& t) {
319 return makeFuture(std::move(t));
325 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
326 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
327 return within(dur, tk)
328 .onError([funcw](TimedOut const&) { return (*funcw)(); });
333 typename std::enable_if<
334 detail::callableWith<F, exception_wrapper>::value &&
335 detail::Extract<F>::ReturnsFuture::value,
337 Future<T>::onError(F&& func) {
339 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
340 "Return type of onError callback must be T or Future<T>");
343 auto f = p.getFuture();
344 auto pm = folly::makeMoveWrapper(std::move(p));
345 auto funcm = folly::makeMoveWrapper(std::move(func));
346 setCallback_([pm, funcm](Try<T> t) mutable {
347 if (t.hasException()) {
349 auto f2 = (*funcm)(std::move(t.exception()));
350 f2.setCallback_([pm](Try<T> t2) mutable {
351 pm->setTry(std::move(t2));
353 } catch (const std::exception& e2) {
354 pm->setException(exception_wrapper(std::current_exception(), e2));
356 pm->setException(exception_wrapper(std::current_exception()));
359 pm->setTry(std::move(t));
366 // onError(exception_wrapper) that returns T
369 typename std::enable_if<
370 detail::callableWith<F, exception_wrapper>::value &&
371 !detail::Extract<F>::ReturnsFuture::value,
373 Future<T>::onError(F&& func) {
375 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
376 "Return type of onError callback must be T or Future<T>");
379 auto f = p.getFuture();
380 auto pm = folly::makeMoveWrapper(std::move(p));
381 auto funcm = folly::makeMoveWrapper(std::move(func));
382 setCallback_([pm, funcm](Try<T> t) mutable {
383 if (t.hasException()) {
385 return (*funcm)(std::move(t.exception()));
388 pm->setTry(std::move(t));
396 typename std::add_lvalue_reference<T>::type Future<T>::value() {
399 return core_->getTry().value();
403 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
406 return core_->getTry().value();
410 Try<T>& Future<T>::getTry() {
413 return core_->getTry();
417 Optional<Try<T>> Future<T>::poll() {
419 if (core_->ready()) {
420 o = std::move(core_->getTry());
426 template <typename Executor>
427 inline Future<T> Future<T>::via(Executor* executor) && {
430 setExecutor(executor);
432 return std::move(*this);
436 template <typename Executor>
437 inline Future<T> Future<T>::via(Executor* executor) & {
440 MoveWrapper<Promise<T>> p;
441 auto f = p->getFuture();
442 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
443 return std::move(f).via(executor);
447 bool Future<T>::isReady() const {
449 return core_->ready();
453 void Future<T>::raise(exception_wrapper exception) {
454 core_->raise(std::move(exception));
460 Future<typename std::decay<T>::type> makeFuture(T&& t) {
461 Promise<typename std::decay<T>::type> p;
462 p.setValue(std::forward<T>(t));
463 return p.getFuture();
466 inline // for multiple translation units
467 Future<void> makeFuture() {
470 return p.getFuture();
476 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
477 -> Future<decltype(func())> {
478 Promise<decltype(func())> p;
483 return p.getFuture();
487 auto makeFutureWith(F const& func) -> Future<decltype(func())> {
489 return makeFutureWith(std::move(copy));
493 Future<T> makeFuture(std::exception_ptr const& e) {
496 return p.getFuture();
500 Future<T> makeFuture(exception_wrapper ew) {
502 p.setException(std::move(ew));
503 return p.getFuture();
506 template <class T, class E>
507 typename std::enable_if<std::is_base_of<std::exception, E>::value,
509 makeFuture(E const& e) {
511 p.setException(make_exception_wrapper<E>(e));
512 return p.getFuture();
516 Future<T> makeFuture(Try<T>&& t) {
517 Promise<typename std::decay<T>::type> p;
518 p.setTry(std::move(t));
519 return p.getFuture();
523 inline Future<void> makeFuture(Try<void>&& t) {
524 if (t.hasException()) {
525 return makeFuture<void>(std::move(t.exception()));
532 template <typename Executor>
533 Future<void> via(Executor* executor) {
534 return makeFuture().via(executor);
539 template <typename... Fs>
540 typename detail::VariadicContext<
541 typename std::decay<Fs>::type::value_type...>::type
542 collectAll(Fs&&... fs) {
544 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
545 ctx->total = sizeof...(fs);
546 auto f_saved = ctx->p.getFuture();
547 detail::collectAllVariadicHelper(ctx,
548 std::forward<typename std::decay<Fs>::type>(fs)...);
554 template <class InputIterator>
557 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
558 collectAll(InputIterator first, InputIterator last) {
560 typename std::iterator_traits<InputIterator>::value_type::value_type T;
563 return makeFuture(std::vector<Try<T>>());
565 size_t n = std::distance(first, last);
567 auto ctx = new detail::WhenAllContext<T>();
569 ctx->results.resize(n);
571 auto f_saved = ctx->p.getFuture();
573 for (size_t i = 0; first != last; ++first, ++i) {
576 f.setCallback_([ctx, i, n](Try<T> t) {
577 ctx->results[i] = std::move(t);
578 if (++ctx->count == n) {
579 ctx->p.setValue(std::move(ctx->results));
590 template <class, class, typename = void> struct CollectContextHelper;
592 template <class T, class VecT>
593 struct CollectContextHelper<T, VecT,
594 typename std::enable_if<std::is_same<T, VecT>::value>::type> {
595 static inline std::vector<T>&& getResults(std::vector<VecT>& results) {
596 return std::move(results);
600 template <class T, class VecT>
601 struct CollectContextHelper<T, VecT,
602 typename std::enable_if<!std::is_same<T, VecT>::value>::type> {
603 static inline std::vector<T> getResults(std::vector<VecT>& results) {
604 std::vector<T> finalResults;
605 finalResults.reserve(results.size());
606 for (auto& opt : results) {
607 finalResults.push_back(std::move(opt.value()));
613 template <typename T>
614 struct CollectContext {
616 typedef typename std::conditional<
617 std::is_default_constructible<T>::value,
622 explicit CollectContext(int n) : count(0), threw(false) {
626 Promise<std::vector<T>> p;
627 std::vector<VecT> results;
628 std::atomic<size_t> count;
629 std::atomic_bool threw;
631 typedef std::vector<T> result_type;
633 static inline Future<std::vector<T>> makeEmptyFuture() {
634 return makeFuture(std::vector<T>());
637 inline void setValue() {
638 p.setValue(CollectContextHelper<T, VecT>::getResults(results));
641 inline void addResult(int i, Try<T>& t) {
642 results[i] = std::move(t.value());
647 struct CollectContext<void> {
649 explicit CollectContext(int n) : count(0), threw(false) {}
652 std::atomic<size_t> count;
653 std::atomic_bool threw;
655 typedef void result_type;
657 static inline Future<void> makeEmptyFuture() {
661 inline void setValue() {
665 inline void addResult(int i, Try<void>& t) {
672 template <class InputIterator>
673 Future<typename detail::CollectContext<
674 typename std::iterator_traits<InputIterator>::value_type::value_type
676 collect(InputIterator first, InputIterator last) {
678 typename std::iterator_traits<InputIterator>::value_type::value_type T;
681 return detail::CollectContext<T>::makeEmptyFuture();
684 size_t n = std::distance(first, last);
685 auto ctx = new detail::CollectContext<T>(n);
686 auto f_saved = ctx->p.getFuture();
688 for (size_t i = 0; first != last; ++first, ++i) {
691 f.setCallback_([ctx, i, n](Try<T> t) {
692 auto c = ++ctx->count;
694 if (t.hasException()) {
695 if (!ctx->threw.exchange(true)) {
696 ctx->p.setException(std::move(t.exception()));
698 } else if (!ctx->threw) {
699 ctx->addResult(i, t);
714 template <class InputIterator>
719 std::iterator_traits<InputIterator>::value_type::value_type> > >
720 collectAny(InputIterator first, InputIterator last) {
722 typename std::iterator_traits<InputIterator>::value_type::value_type T;
724 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
725 auto f_saved = ctx->p.getFuture();
727 for (size_t i = 0; first != last; first++, i++) {
729 f.setCallback_([i, ctx](Try<T>&& t) {
730 if (!ctx->done.exchange(true)) {
731 ctx->p.setValue(std::make_pair(i, std::move(t)));
740 template <class InputIterator>
741 Future<std::vector<std::pair<size_t, Try<typename
742 std::iterator_traits<InputIterator>::value_type::value_type>>>>
743 collectN(InputIterator first, InputIterator last, size_t n) {
745 std::iterator_traits<InputIterator>::value_type::value_type T;
746 typedef std::vector<std::pair<size_t, Try<T>>> V;
753 auto ctx = std::make_shared<ctx_t>();
756 // for each completed Future, increase count and add to vector, until we
757 // have n completed futures at which point we fulfill our Promise with the
762 it->then([ctx, n, i](Try<T>&& t) {
764 auto c = ++ctx->completed;
766 assert(ctx->v.size() < n);
767 v.push_back(std::make_pair(i, std::move(t)));
769 ctx->p.setTry(Try<V>(std::move(v)));
779 ctx->p.setException(std::runtime_error("Not enough futures"));
782 return ctx->p.getFuture();
785 template <class It, class T, class F, class ItT, class Arg>
786 typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
787 reduce(It first, It last, T initial, F func) {
789 return makeFuture(std::move(initial));
792 typedef isTry<Arg> IsTry;
794 return collectAll(first, last)
795 .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
796 for (auto& val : vals) {
797 initial = func(std::move(initial),
798 // Either return a ItT&& or a Try<ItT>&& depending
799 // on the type of the argument of func.
800 val.template get<IsTry::value, Arg&&>());
806 template <class It, class T, class F, class ItT, class Arg>
807 typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
808 reduce(It first, It last, T initial, F func) {
810 return makeFuture(std::move(initial));
813 typedef isTry<Arg> IsTry;
815 auto f = first->then([initial, func](Try<ItT>& head) mutable {
816 return func(std::move(initial),
817 head.template get<IsTry::value, Arg&&>());
820 for (++first; first != last; ++first) {
821 f = collectAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
822 return func(std::move(std::get<0>(t).value()),
823 // Either return a ItT&& or a Try<ItT>&& depending
824 // on the type of the argument of func.
825 std::get<1>(t).template get<IsTry::value, Arg&&>());
833 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
834 return within(dur, TimedOut(), tk);
839 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
842 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
845 std::atomic<bool> token;
847 auto ctx = std::make_shared<Context>(std::move(e));
850 tk = folly::detail::getTimekeeperSingleton();
854 .then([ctx](Try<void> const& t) {
855 if (ctx->token.exchange(true) == false) {
856 if (t.hasException()) {
857 ctx->promise.setException(std::move(t.exception()));
859 ctx->promise.setException(std::move(ctx->exception));
864 this->then([ctx](Try<T>&& t) {
865 if (ctx->token.exchange(true) == false) {
866 ctx->promise.setTry(std::move(t));
870 return ctx->promise.getFuture();
874 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
875 return collectAll(*this, futures::sleep(dur, tk))
876 .then([](std::tuple<Try<T>, Try<void>> tup) {
877 Try<T>& t = std::get<0>(tup);
878 return makeFuture<T>(std::move(t));
885 void waitImpl(Future<T>& f) {
886 // short-circuit if there's nothing to do
887 if (f.isReady()) return;
889 folly::fibers::Baton baton;
890 f = f.then([&](Try<T> t) {
892 return makeFuture(std::move(t));
896 // There's a race here between the return here and the actual finishing of
897 // the future. f is completed, but the setup may not have finished on done
898 // after the baton has posted.
899 while (!f.isReady()) {
900 std::this_thread::yield();
905 void waitImpl(Future<T>& f, Duration dur) {
906 // short-circuit if there's nothing to do
907 if (f.isReady()) return;
909 auto baton = std::make_shared<folly::fibers::Baton>();
910 f = f.then([baton](Try<T> t) {
912 return makeFuture(std::move(t));
915 // Let's preserve the invariant that if we did not timeout (timed_wait returns
916 // true), then the returned Future is complete when it is returned to the
917 // caller. We need to wait out the race for that Future to complete.
918 if (baton->timed_wait(dur)) {
919 while (!f.isReady()) {
920 std::this_thread::yield();
926 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
927 while (!f.isReady()) {
935 Future<T>& Future<T>::wait() & {
936 detail::waitImpl(*this);
941 Future<T>&& Future<T>::wait() && {
942 detail::waitImpl(*this);
943 return std::move(*this);
947 Future<T>& Future<T>::wait(Duration dur) & {
948 detail::waitImpl(*this, dur);
953 Future<T>&& Future<T>::wait(Duration dur) && {
954 detail::waitImpl(*this, dur);
955 return std::move(*this);
959 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
960 detail::waitViaImpl(*this, e);
965 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
966 detail::waitViaImpl(*this, e);
967 return std::move(*this);
972 return std::move(wait().value());
976 inline void Future<void>::get() {
981 T Future<T>::get(Duration dur) {
984 return std::move(value());
991 inline void Future<void>::get(Duration dur) {
1001 T Future<T>::getVia(DrivableExecutor* e) {
1002 return std::move(waitVia(e).value());
1006 inline void Future<void>::getVia(DrivableExecutor* e) {
1011 Future<bool> Future<T>::willEqual(Future<T>& f) {
1012 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1013 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1014 return std::get<0>(t).value() == std::get<1>(t).value();
1023 Future<T> Future<T>::filter(F predicate) {
1024 auto p = folly::makeMoveWrapper(std::move(predicate));
1025 return this->then([p](T val) {
1026 T const& valConstRef = val;
1027 if (!(*p)(valConstRef)) {
1028 throw PredicateDoesNotObtain();
1037 Future<Z> chainHelper(Future<Z> f) {
1041 template <class Z, class F, class Fn, class... Callbacks>
1042 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
1043 return chainHelper<Z>(f.then(fn), fns...);
1047 template <class A, class Z, class... Callbacks>
1048 std::function<Future<Z>(Try<A>)>
1049 chain(Callbacks... fns) {
1050 MoveWrapper<Promise<A>> pw;
1051 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
1052 return [=](Try<A> t) mutable {
1053 pw->setTry(std::move(t));
1054 return std::move(*fw);
1058 template <class It, class F, class ItT, class Result>
1059 std::vector<Future<Result>> map(It first, It last, F func) {
1060 std::vector<Future<Result>> results;
1061 for (auto it = first; it != last; it++) {
1062 results.push_back(it->then(func));
1068 } // namespace folly
1070 // I haven't included a Future<T&> specialization because I don't forsee us
1071 // using it, however it is not difficult to add when needed. Refer to
1072 // Future<void> for guidance. std::future and boost::future code would also be