2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
49 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
53 *this = p.getFuture();
59 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
62 p.setValue(std::forward<F>(val));
63 *this = p.getFuture();
68 typename std::enable_if<std::is_void<F>::value, int>::type>
69 Future<void>::Future() : core_(nullptr) {
72 *this = p.getFuture();
77 Future<T>::~Future() {
82 void Future<T>::detach() {
84 core_->detachFuture();
90 void Future<T>::throwIfInvalid() const {
97 void Future<T>::setCallback_(F&& func) {
99 core_->setCallback(std::move(func));
106 typename std::enable_if<isFuture<F>::value,
107 Future<typename isFuture<T>::Inner>>::type
108 Future<T>::unwrap() {
109 return then([](Future<typename isFuture<T>::Inner> internal_future) {
110 return internal_future;
116 // Variant: returns a value
117 // e.g. f.then([](Try<T>&& t){ return t.value(); });
119 template <typename F, typename R, bool isTry, typename... Args>
120 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
121 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
122 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
123 typedef typename R::ReturnsFuture::Inner B;
127 // wrap these so we can move them into the lambda
128 folly::MoveWrapper<Promise<B>> p;
129 folly::MoveWrapper<F> funcm(std::forward<F>(func));
131 // grab the Future now before we lose our handle on the Promise
132 auto f = p->getFuture();
134 f.setExecutor(getExecutor());
137 /* This is a bit tricky.
139 We can't just close over *this in case this Future gets moved. So we
140 make a new dummy Future. We could figure out something more
141 sophisticated that avoids making a new Future object when it can, as an
142 optimization. But this is correct.
144 core_ can't be moved, it is explicitly disallowed (as is copying). But
145 if there's ever a reason to allow it, this is one place that makes that
146 assumption and would need to be fixed. We use a standard shared pointer
147 for core_ (by copying it in), which means in essence obj holds a shared
148 pointer to itself. But this shouldn't leak because Promise will not
149 outlive the continuation, because Promise will setException() with a
150 broken Promise if it is destructed before completed. We could use a
151 weak pointer but it would have to be converted to a shared pointer when
152 func is executed (because the Future returned by func may possibly
153 persist beyond the callback, if it gets moved), and so it is an
154 optimization to just make it shared from the get-go.
156 We have to move in the Promise and func using the MoveWrapper
157 hack. (func could be copied but it's a big drag on perf).
159 Two subtle but important points about this design. detail::Core has no
160 back pointers to Future or Promise, so if Future or Promise get moved
161 (and they will be moved in performant code) we don't have to do
162 anything fancy. And because we store the continuation in the
163 detail::Core, not in the Future, we can execute the continuation even
164 after the Future has gone out of scope. This is an intentional design
165 decision. It is likely we will want to be able to cancel a continuation
166 in some circumstances, but I think it should be explicit not implicit
167 in the destruction of the Future used to create it.
170 [p, funcm](Try<T>&& t) mutable {
171 if (!isTry && t.hasException()) {
172 p->setException(std::move(t.exception()));
175 return (*funcm)(t.template get<isTry, Args>()...);
183 // Variant: returns a Future
184 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
186 template <typename F, typename R, bool isTry, typename... Args>
187 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
188 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
189 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
190 typedef typename R::ReturnsFuture::Inner B;
194 // wrap these so we can move them into the lambda
195 folly::MoveWrapper<Promise<B>> p;
196 folly::MoveWrapper<F> funcm(std::forward<F>(func));
198 // grab the Future now before we lose our handle on the Promise
199 auto f = p->getFuture();
201 f.setExecutor(getExecutor());
205 [p, funcm](Try<T>&& t) mutable {
206 if (!isTry && t.hasException()) {
207 p->setException(std::move(t.exception()));
210 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
211 // that didn't throw, now we can steal p
212 f2.setCallback_([p](Try<B>&& b) mutable {
213 p->setTry(std::move(b));
215 } catch (const std::exception& e) {
216 p->setException(exception_wrapper(std::current_exception(), e));
218 p->setException(exception_wrapper(std::current_exception()));
226 template <typename T>
227 template <typename R, typename Caller, typename... Args>
228 Future<typename isFuture<R>::Inner>
229 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
230 typedef typename std::remove_cv<
231 typename std::remove_reference<
232 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
233 return then([instance, func](Try<T>&& t){
234 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
239 Future<void> Future<T>::then() {
240 return then([] (Try<T>&& t) {});
243 // onError where the callback returns T
246 typename std::enable_if<
247 !detail::callableWith<F, exception_wrapper>::value &&
248 !detail::Extract<F>::ReturnsFuture::value,
250 Future<T>::onError(F&& func) {
251 typedef typename detail::Extract<F>::FirstArg Exn;
253 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
254 "Return type of onError callback must be T or Future<T>");
257 auto f = p.getFuture();
258 auto pm = folly::makeMoveWrapper(std::move(p));
259 auto funcm = folly::makeMoveWrapper(std::move(func));
260 setCallback_([pm, funcm](Try<T>&& t) mutable {
261 if (!t.template withException<Exn>([&] (Exn& e) {
266 pm->setTry(std::move(t));
273 // onError where the callback returns Future<T>
276 typename std::enable_if<
277 !detail::callableWith<F, exception_wrapper>::value &&
278 detail::Extract<F>::ReturnsFuture::value,
280 Future<T>::onError(F&& func) {
282 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
283 "Return type of onError callback must be T or Future<T>");
284 typedef typename detail::Extract<F>::FirstArg Exn;
287 auto f = p.getFuture();
288 auto pm = folly::makeMoveWrapper(std::move(p));
289 auto funcm = folly::makeMoveWrapper(std::move(func));
290 setCallback_([pm, funcm](Try<T>&& t) mutable {
291 if (!t.template withException<Exn>([&] (Exn& e) {
293 auto f2 = (*funcm)(e);
294 f2.setCallback_([pm](Try<T>&& t2) mutable {
295 pm->setTry(std::move(t2));
297 } catch (const std::exception& e2) {
298 pm->setException(exception_wrapper(std::current_exception(), e2));
300 pm->setException(exception_wrapper(std::current_exception()));
303 pm->setTry(std::move(t));
312 Future<T> Future<T>::ensure(F func) {
313 MoveWrapper<F> funcw(std::move(func));
314 return this->then([funcw](Try<T>&& t) {
316 return makeFuture(std::move(t));
322 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
323 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
324 return within(dur, tk)
325 .onError([funcw](TimedOut const&) { return (*funcw)(); });
330 typename std::enable_if<
331 detail::callableWith<F, exception_wrapper>::value &&
332 detail::Extract<F>::ReturnsFuture::value,
334 Future<T>::onError(F&& func) {
336 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
337 "Return type of onError callback must be T or Future<T>");
340 auto f = p.getFuture();
341 auto pm = folly::makeMoveWrapper(std::move(p));
342 auto funcm = folly::makeMoveWrapper(std::move(func));
343 setCallback_([pm, funcm](Try<T> t) mutable {
344 if (t.hasException()) {
346 auto f2 = (*funcm)(std::move(t.exception()));
347 f2.setCallback_([pm](Try<T> t2) mutable {
348 pm->setTry(std::move(t2));
350 } catch (const std::exception& e2) {
351 pm->setException(exception_wrapper(std::current_exception(), e2));
353 pm->setException(exception_wrapper(std::current_exception()));
356 pm->setTry(std::move(t));
363 // onError(exception_wrapper) that returns T
366 typename std::enable_if<
367 detail::callableWith<F, exception_wrapper>::value &&
368 !detail::Extract<F>::ReturnsFuture::value,
370 Future<T>::onError(F&& func) {
372 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
373 "Return type of onError callback must be T or Future<T>");
376 auto f = p.getFuture();
377 auto pm = folly::makeMoveWrapper(std::move(p));
378 auto funcm = folly::makeMoveWrapper(std::move(func));
379 setCallback_([pm, funcm](Try<T> t) mutable {
380 if (t.hasException()) {
382 return (*funcm)(std::move(t.exception()));
385 pm->setTry(std::move(t));
393 typename std::add_lvalue_reference<T>::type Future<T>::value() {
396 return core_->getTry().value();
400 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
403 return core_->getTry().value();
407 Try<T>& Future<T>::getTry() {
410 return core_->getTry();
414 Optional<Try<T>> Future<T>::poll() {
416 if (core_->ready()) {
417 o = std::move(core_->getTry());
423 template <typename Executor>
424 inline Future<T> Future<T>::via(Executor* executor) && {
427 setExecutor(executor);
429 return std::move(*this);
433 template <typename Executor>
434 inline Future<T> Future<T>::via(Executor* executor) & {
437 MoveWrapper<Promise<T>> p;
438 auto f = p->getFuture();
439 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
440 return std::move(f).via(executor);
444 bool Future<T>::isReady() const {
446 return core_->ready();
450 void Future<T>::raise(exception_wrapper exception) {
451 core_->raise(std::move(exception));
457 Future<typename std::decay<T>::type> makeFuture(T&& t) {
458 Promise<typename std::decay<T>::type> p;
459 p.setValue(std::forward<T>(t));
460 return p.getFuture();
463 inline // for multiple translation units
464 Future<void> makeFuture() {
467 return p.getFuture();
473 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
474 -> Future<decltype(func())> {
475 Promise<decltype(func())> p;
480 return p.getFuture();
484 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
486 return makeFutureTry(std::move(copy));
490 Future<T> makeFuture(std::exception_ptr const& e) {
493 return p.getFuture();
497 Future<T> makeFuture(exception_wrapper ew) {
499 p.setException(std::move(ew));
500 return p.getFuture();
503 template <class T, class E>
504 typename std::enable_if<std::is_base_of<std::exception, E>::value,
506 makeFuture(E const& e) {
508 p.setException(make_exception_wrapper<E>(e));
509 return p.getFuture();
513 Future<T> makeFuture(Try<T>&& t) {
514 Promise<typename std::decay<T>::type> p;
515 p.setTry(std::move(t));
516 return p.getFuture();
520 inline Future<void> makeFuture(Try<void>&& t) {
521 if (t.hasException()) {
522 return makeFuture<void>(std::move(t.exception()));
529 template <typename Executor>
530 Future<void> via(Executor* executor) {
531 return makeFuture().via(executor);
536 template <typename... Fs>
537 typename detail::VariadicContext<
538 typename std::decay<Fs>::type::value_type...>::type
539 whenAll(Fs&&... fs) {
541 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
542 ctx->total = sizeof...(fs);
543 auto f_saved = ctx->p.getFuture();
544 detail::whenAllVariadicHelper(ctx,
545 std::forward<typename std::decay<Fs>::type>(fs)...);
551 template <class InputIterator>
554 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
555 whenAll(InputIterator first, InputIterator last) {
557 typename std::iterator_traits<InputIterator>::value_type::value_type T;
560 return makeFuture(std::vector<Try<T>>());
562 size_t n = std::distance(first, last);
564 auto ctx = new detail::WhenAllContext<T>();
566 ctx->results.resize(n);
568 auto f_saved = ctx->p.getFuture();
570 for (size_t i = 0; first != last; ++first, ++i) {
573 f.setCallback_([ctx, i, n](Try<T> t) {
574 ctx->results[i] = std::move(t);
575 if (++ctx->count == n) {
576 ctx->p.setValue(std::move(ctx->results));
587 template <class, class, typename = void> struct CollectContextHelper;
589 template <class T, class VecT>
590 struct CollectContextHelper<T, VecT,
591 typename std::enable_if<std::is_same<T, VecT>::value>::type> {
592 static inline std::vector<T>& getResults(std::vector<VecT>& results) {
597 template <class T, class VecT>
598 struct CollectContextHelper<T, VecT,
599 typename std::enable_if<!std::is_same<T, VecT>::value>::type> {
600 static inline std::vector<T> getResults(std::vector<VecT>& results) {
601 std::vector<T> finalResults;
602 finalResults.reserve(results.size());
603 for (auto& opt : results) {
604 finalResults.push_back(std::move(opt.value()));
610 template <typename T>
611 struct CollectContext {
613 typedef typename std::conditional<
614 std::is_default_constructible<T>::value,
619 explicit CollectContext(int n) : count(0), threw(false) {
623 Promise<std::vector<T>> p;
624 std::vector<VecT> results;
625 std::atomic<size_t> count;
626 std::atomic_bool threw;
628 typedef std::vector<T> result_type;
630 static inline Future<std::vector<T>> makeEmptyFuture() {
631 return makeFuture(std::vector<T>());
634 inline void setValue() {
635 p.setValue(CollectContextHelper<T, VecT>::getResults(results));
638 inline void addResult(int i, Try<T>& t) {
639 results[i] = std::move(t.value());
644 struct CollectContext<void> {
646 explicit CollectContext(int n) : count(0), threw(false) {}
649 std::atomic<size_t> count;
650 std::atomic_bool threw;
652 typedef void result_type;
654 static inline Future<void> makeEmptyFuture() {
658 inline void setValue() {
662 inline void addResult(int i, Try<void>& t) {
669 template <class InputIterator>
670 Future<typename detail::CollectContext<
671 typename std::iterator_traits<InputIterator>::value_type::value_type
673 collect(InputIterator first, InputIterator last) {
675 typename std::iterator_traits<InputIterator>::value_type::value_type T;
678 return detail::CollectContext<T>::makeEmptyFuture();
681 size_t n = std::distance(first, last);
682 auto ctx = new detail::CollectContext<T>(n);
683 auto f_saved = ctx->p.getFuture();
685 for (size_t i = 0; first != last; ++first, ++i) {
688 f.setCallback_([ctx, i, n](Try<T> t) {
689 auto c = ++ctx->count;
691 if (t.hasException()) {
692 if (!ctx->threw.exchange(true)) {
693 ctx->p.setException(std::move(t.exception()));
695 } else if (!ctx->threw) {
696 ctx->addResult(i, t);
711 template <class InputIterator>
716 std::iterator_traits<InputIterator>::value_type::value_type> > >
717 whenAny(InputIterator first, InputIterator last) {
719 typename std::iterator_traits<InputIterator>::value_type::value_type T;
721 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
722 auto f_saved = ctx->p.getFuture();
724 for (size_t i = 0; first != last; first++, i++) {
726 f.setCallback_([i, ctx](Try<T>&& t) {
727 if (!ctx->done.exchange(true)) {
728 ctx->p.setValue(std::make_pair(i, std::move(t)));
737 template <class InputIterator>
738 Future<std::vector<std::pair<size_t, Try<typename
739 std::iterator_traits<InputIterator>::value_type::value_type>>>>
740 whenN(InputIterator first, InputIterator last, size_t n) {
742 std::iterator_traits<InputIterator>::value_type::value_type T;
743 typedef std::vector<std::pair<size_t, Try<T>>> V;
750 auto ctx = std::make_shared<ctx_t>();
753 // for each completed Future, increase count and add to vector, until we
754 // have n completed futures at which point we fulfill our Promise with the
759 it->then([ctx, n, i](Try<T>&& t) {
761 auto c = ++ctx->completed;
763 assert(ctx->v.size() < n);
764 v.push_back(std::make_pair(i, std::move(t)));
766 ctx->p.setTry(Try<V>(std::move(v)));
776 ctx->p.setException(std::runtime_error("Not enough futures"));
779 return ctx->p.getFuture();
782 template <class It, class T, class F, class ItT, class Arg>
783 typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
784 reduce(It first, It last, T initial, F func) {
786 return makeFuture(std::move(initial));
789 typedef isTry<Arg> IsTry;
791 return whenAll(first, last)
792 .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
793 for (auto& val : vals) {
794 initial = func(std::move(initial),
795 // Either return a ItT&& or a Try<ItT>&& depending
796 // on the type of the argument of func.
797 val.template get<IsTry::value, Arg&&>());
803 template <class It, class T, class F, class ItT, class Arg>
804 typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
805 reduce(It first, It last, T initial, F func) {
807 return makeFuture(std::move(initial));
810 typedef isTry<Arg> IsTry;
812 auto f = first->then([initial, func](Try<ItT>& head) mutable {
813 return func(std::move(initial),
814 head.template get<IsTry::value, Arg&&>());
817 for (++first; first != last; ++first) {
818 f = whenAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
819 return func(std::move(std::get<0>(t).value()),
820 // Either return a ItT&& or a Try<ItT>&& depending
821 // on the type of the argument of func.
822 std::get<1>(t).template get<IsTry::value, Arg&&>());
830 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
831 return within(dur, TimedOut(), tk);
836 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
839 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
842 std::atomic<bool> token;
844 auto ctx = std::make_shared<Context>(std::move(e));
847 tk = folly::detail::getTimekeeperSingleton();
851 .then([ctx](Try<void> const& t) {
852 if (ctx->token.exchange(true) == false) {
853 if (t.hasException()) {
854 ctx->promise.setException(std::move(t.exception()));
856 ctx->promise.setException(std::move(ctx->exception));
861 this->then([ctx](Try<T>&& t) {
862 if (ctx->token.exchange(true) == false) {
863 ctx->promise.setTry(std::move(t));
867 return ctx->promise.getFuture();
871 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
872 return whenAll(*this, futures::sleep(dur, tk))
873 .then([](std::tuple<Try<T>, Try<void>> tup) {
874 Try<T>& t = std::get<0>(tup);
875 return makeFuture<T>(std::move(t));
882 void waitImpl(Future<T>& f) {
883 // short-circuit if there's nothing to do
884 if (f.isReady()) return;
887 f = f.then([&](Try<T> t) {
889 return makeFuture(std::move(t));
893 // There's a race here between the return here and the actual finishing of
894 // the future. f is completed, but the setup may not have finished on done
895 // after the baton has posted.
896 while (!f.isReady()) {
897 std::this_thread::yield();
902 void waitImpl(Future<T>& f, Duration dur) {
903 // short-circuit if there's nothing to do
904 if (f.isReady()) return;
906 auto baton = std::make_shared<Baton<>>();
907 f = f.then([baton](Try<T> t) {
909 return makeFuture(std::move(t));
912 // Let's preserve the invariant that if we did not timeout (timed_wait returns
913 // true), then the returned Future is complete when it is returned to the
914 // caller. We need to wait out the race for that Future to complete.
915 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
916 while (!f.isReady()) {
917 std::this_thread::yield();
923 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
924 while (!f.isReady()) {
932 Future<T>& Future<T>::wait() & {
933 detail::waitImpl(*this);
938 Future<T>&& Future<T>::wait() && {
939 detail::waitImpl(*this);
940 return std::move(*this);
944 Future<T>& Future<T>::wait(Duration dur) & {
945 detail::waitImpl(*this, dur);
950 Future<T>&& Future<T>::wait(Duration dur) && {
951 detail::waitImpl(*this, dur);
952 return std::move(*this);
956 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
957 detail::waitViaImpl(*this, e);
962 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
963 detail::waitViaImpl(*this, e);
964 return std::move(*this);
969 return std::move(wait().value());
973 inline void Future<void>::get() {
978 T Future<T>::get(Duration dur) {
981 return std::move(value());
988 inline void Future<void>::get(Duration dur) {
998 T Future<T>::getVia(DrivableExecutor* e) {
999 return std::move(waitVia(e).value());
1003 inline void Future<void>::getVia(DrivableExecutor* e) {
1008 Future<bool> Future<T>::willEqual(Future<T>& f) {
1009 return whenAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1010 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1011 return std::get<0>(t).value() == std::get<1>(t).value();
1020 Future<T> Future<T>::filter(F predicate) {
1021 auto p = folly::makeMoveWrapper(std::move(predicate));
1022 return this->then([p](T val) {
1023 T const& valConstRef = val;
1024 if (!(*p)(valConstRef)) {
1025 throw PredicateDoesNotObtain();
1034 Future<Z> chainHelper(Future<Z> f) {
1038 template <class Z, class F, class Fn, class... Callbacks>
1039 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
1040 return chainHelper<Z>(f.then(fn), fns...);
1044 template <class A, class Z, class... Callbacks>
1045 std::function<Future<Z>(Try<A>)>
1046 chain(Callbacks... fns) {
1047 MoveWrapper<Promise<A>> pw;
1048 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
1049 return [=](Try<A> t) mutable {
1050 pw->setTry(std::move(t));
1051 return std::move(*fw);
1055 template <class It, class F, class ItT, class Result>
1056 std::vector<Future<Result>> map(It first, It last, F func) {
1057 std::vector<Future<Result>> results;
1058 for (auto it = first; it != last; it++) {
1059 results.push_back(it->then(func));
1065 } // namespace folly
1067 // I haven't included a Future<T&> specialization because I don't forsee us
1068 // using it, however it is not difficult to add when needed. Refer to
1069 // Future<void> for guidance. std::future and boost::future code would also be