2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/Baton.h>
23 #include <folly/Optional.h>
24 #include <folly/futures/detail/Core.h>
25 #include <folly/futures/Timekeeper.h>
32 Timekeeper* getTimekeeperSingleton();
36 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
37 other.core_ = nullptr;
41 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
42 std::swap(core_, other.core_);
49 const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
53 *this = p.getFuture();
59 typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
62 p.setValue(std::forward<F>(val));
63 *this = p.getFuture();
68 typename std::enable_if<std::is_void<F>::value, int>::type>
69 Future<void>::Future() : core_(nullptr) {
72 *this = p.getFuture();
77 Future<T>::~Future() {
82 void Future<T>::detach() {
84 core_->detachFuture();
90 void Future<T>::throwIfInvalid() const {
97 void Future<T>::setCallback_(F&& func) {
99 core_->setCallback(std::move(func));
106 typename std::enable_if<isFuture<F>::value,
107 Future<typename isFuture<T>::Inner>>::type
108 Future<T>::unwrap() {
109 return then([](Future<typename isFuture<T>::Inner> internal_future) {
110 return internal_future;
116 // Variant: returns a value
117 // e.g. f.then([](Try<T>&& t){ return t.value(); });
119 template <typename F, typename R, bool isTry, typename... Args>
120 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
121 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
122 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
123 typedef typename R::ReturnsFuture::Inner B;
127 // wrap these so we can move them into the lambda
128 folly::MoveWrapper<Promise<B>> p;
129 folly::MoveWrapper<F> funcm(std::forward<F>(func));
131 // grab the Future now before we lose our handle on the Promise
132 auto f = p->getFuture();
134 f.setExecutor(getExecutor());
137 /* This is a bit tricky.
139 We can't just close over *this in case this Future gets moved. So we
140 make a new dummy Future. We could figure out something more
141 sophisticated that avoids making a new Future object when it can, as an
142 optimization. But this is correct.
144 core_ can't be moved, it is explicitly disallowed (as is copying). But
145 if there's ever a reason to allow it, this is one place that makes that
146 assumption and would need to be fixed. We use a standard shared pointer
147 for core_ (by copying it in), which means in essence obj holds a shared
148 pointer to itself. But this shouldn't leak because Promise will not
149 outlive the continuation, because Promise will setException() with a
150 broken Promise if it is destructed before completed. We could use a
151 weak pointer but it would have to be converted to a shared pointer when
152 func is executed (because the Future returned by func may possibly
153 persist beyond the callback, if it gets moved), and so it is an
154 optimization to just make it shared from the get-go.
156 We have to move in the Promise and func using the MoveWrapper
157 hack. (func could be copied but it's a big drag on perf).
159 Two subtle but important points about this design. detail::Core has no
160 back pointers to Future or Promise, so if Future or Promise get moved
161 (and they will be moved in performant code) we don't have to do
162 anything fancy. And because we store the continuation in the
163 detail::Core, not in the Future, we can execute the continuation even
164 after the Future has gone out of scope. This is an intentional design
165 decision. It is likely we will want to be able to cancel a continuation
166 in some circumstances, but I think it should be explicit not implicit
167 in the destruction of the Future used to create it.
170 [p, funcm](Try<T>&& t) mutable {
171 if (!isTry && t.hasException()) {
172 p->setException(std::move(t.exception()));
175 return (*funcm)(t.template get<isTry, Args>()...);
183 // Variant: returns a Future
184 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
186 template <typename F, typename R, bool isTry, typename... Args>
187 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
188 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
189 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
190 typedef typename R::ReturnsFuture::Inner B;
194 // wrap these so we can move them into the lambda
195 folly::MoveWrapper<Promise<B>> p;
196 folly::MoveWrapper<F> funcm(std::forward<F>(func));
198 // grab the Future now before we lose our handle on the Promise
199 auto f = p->getFuture();
201 f.setExecutor(getExecutor());
205 [p, funcm](Try<T>&& t) mutable {
206 if (!isTry && t.hasException()) {
207 p->setException(std::move(t.exception()));
210 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
211 // that didn't throw, now we can steal p
212 f2.setCallback_([p](Try<B>&& b) mutable {
213 p->setTry(std::move(b));
215 } catch (const std::exception& e) {
216 p->setException(exception_wrapper(std::current_exception(), e));
218 p->setException(exception_wrapper(std::current_exception()));
226 template <typename T>
227 template <typename R, typename Caller, typename... Args>
228 Future<typename isFuture<R>::Inner>
229 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
230 typedef typename std::remove_cv<
231 typename std::remove_reference<
232 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
233 return then([instance, func](Try<T>&& t){
234 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
239 Future<void> Future<T>::then() {
240 return then([] (Try<T>&& t) {});
243 // onError where the callback returns T
246 typename std::enable_if<
247 !detail::callableWith<F, exception_wrapper>::value &&
248 !detail::Extract<F>::ReturnsFuture::value,
250 Future<T>::onError(F&& func) {
251 typedef typename detail::Extract<F>::FirstArg Exn;
253 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
254 "Return type of onError callback must be T or Future<T>");
257 auto f = p.getFuture();
258 auto pm = folly::makeMoveWrapper(std::move(p));
259 auto funcm = folly::makeMoveWrapper(std::move(func));
260 setCallback_([pm, funcm](Try<T>&& t) mutable {
261 if (!t.template withException<Exn>([&] (Exn& e) {
266 pm->setTry(std::move(t));
273 // onError where the callback returns Future<T>
276 typename std::enable_if<
277 !detail::callableWith<F, exception_wrapper>::value &&
278 detail::Extract<F>::ReturnsFuture::value,
280 Future<T>::onError(F&& func) {
282 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
283 "Return type of onError callback must be T or Future<T>");
284 typedef typename detail::Extract<F>::FirstArg Exn;
287 auto f = p.getFuture();
288 auto pm = folly::makeMoveWrapper(std::move(p));
289 auto funcm = folly::makeMoveWrapper(std::move(func));
290 setCallback_([pm, funcm](Try<T>&& t) mutable {
291 if (!t.template withException<Exn>([&] (Exn& e) {
293 auto f2 = (*funcm)(e);
294 f2.setCallback_([pm](Try<T>&& t2) mutable {
295 pm->setTry(std::move(t2));
297 } catch (const std::exception& e2) {
298 pm->setException(exception_wrapper(std::current_exception(), e2));
300 pm->setException(exception_wrapper(std::current_exception()));
303 pm->setTry(std::move(t));
312 Future<T> Future<T>::ensure(F func) {
313 MoveWrapper<F> funcw(std::move(func));
314 return this->then([funcw](Try<T>&& t) {
316 return makeFuture(std::move(t));
322 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
323 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
324 return within(dur, tk)
325 .onError([funcw](TimedOut const&) { return (*funcw)(); });
330 typename std::enable_if<
331 detail::callableWith<F, exception_wrapper>::value &&
332 detail::Extract<F>::ReturnsFuture::value,
334 Future<T>::onError(F&& func) {
336 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
337 "Return type of onError callback must be T or Future<T>");
340 auto f = p.getFuture();
341 auto pm = folly::makeMoveWrapper(std::move(p));
342 auto funcm = folly::makeMoveWrapper(std::move(func));
343 setCallback_([pm, funcm](Try<T> t) mutable {
344 if (t.hasException()) {
346 auto f2 = (*funcm)(std::move(t.exception()));
347 f2.setCallback_([pm](Try<T> t2) mutable {
348 pm->setTry(std::move(t2));
350 } catch (const std::exception& e2) {
351 pm->setException(exception_wrapper(std::current_exception(), e2));
353 pm->setException(exception_wrapper(std::current_exception()));
356 pm->setTry(std::move(t));
363 // onError(exception_wrapper) that returns T
366 typename std::enable_if<
367 detail::callableWith<F, exception_wrapper>::value &&
368 !detail::Extract<F>::ReturnsFuture::value,
370 Future<T>::onError(F&& func) {
372 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
373 "Return type of onError callback must be T or Future<T>");
376 auto f = p.getFuture();
377 auto pm = folly::makeMoveWrapper(std::move(p));
378 auto funcm = folly::makeMoveWrapper(std::move(func));
379 setCallback_([pm, funcm](Try<T> t) mutable {
380 if (t.hasException()) {
382 return (*funcm)(std::move(t.exception()));
385 pm->setTry(std::move(t));
393 typename std::add_lvalue_reference<T>::type Future<T>::value() {
396 return core_->getTry().value();
400 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
403 return core_->getTry().value();
407 Try<T>& Future<T>::getTry() {
410 return core_->getTry();
414 Optional<Try<T>> Future<T>::poll() {
416 if (core_->ready()) {
417 o = std::move(core_->getTry());
423 template <typename Executor>
424 inline Future<T> Future<T>::via(Executor* executor) && {
427 setExecutor(executor);
429 return std::move(*this);
433 template <typename Executor>
434 inline Future<T> Future<T>::via(Executor* executor) & {
437 MoveWrapper<Promise<T>> p;
438 auto f = p->getFuture();
439 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
440 return std::move(f).via(executor);
444 bool Future<T>::isReady() const {
446 return core_->ready();
450 void Future<T>::raise(exception_wrapper exception) {
451 core_->raise(std::move(exception));
457 Future<typename std::decay<T>::type> makeFuture(T&& t) {
458 Promise<typename std::decay<T>::type> p;
459 p.setValue(std::forward<T>(t));
460 return p.getFuture();
463 inline // for multiple translation units
464 Future<void> makeFuture() {
467 return p.getFuture();
473 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
474 -> Future<decltype(func())> {
475 Promise<decltype(func())> p;
480 return p.getFuture();
484 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
486 return makeFutureTry(std::move(copy));
490 Future<T> makeFuture(std::exception_ptr const& e) {
493 return p.getFuture();
497 Future<T> makeFuture(exception_wrapper ew) {
499 p.setException(std::move(ew));
500 return p.getFuture();
503 template <class T, class E>
504 typename std::enable_if<std::is_base_of<std::exception, E>::value,
506 makeFuture(E const& e) {
508 p.setException(make_exception_wrapper<E>(e));
509 return p.getFuture();
513 Future<T> makeFuture(Try<T>&& t) {
514 Promise<typename std::decay<T>::type> p;
515 p.setTry(std::move(t));
516 return p.getFuture();
520 inline Future<void> makeFuture(Try<void>&& t) {
521 if (t.hasException()) {
522 return makeFuture<void>(std::move(t.exception()));
529 template <typename Executor>
530 Future<void> via(Executor* executor) {
531 return makeFuture().via(executor);
536 template <typename... Fs>
537 typename detail::VariadicContext<
538 typename std::decay<Fs>::type::value_type...>::type
539 whenAll(Fs&&... fs) {
541 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
542 ctx->total = sizeof...(fs);
543 auto f_saved = ctx->p.getFuture();
544 detail::whenAllVariadicHelper(ctx,
545 std::forward<typename std::decay<Fs>::type>(fs)...);
551 template <class InputIterator>
554 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
555 whenAll(InputIterator first, InputIterator last) {
557 typename std::iterator_traits<InputIterator>::value_type::value_type T;
560 return makeFuture(std::vector<Try<T>>());
562 size_t n = std::distance(first, last);
564 auto ctx = new detail::WhenAllContext<T>();
566 ctx->results.resize(n);
568 auto f_saved = ctx->p.getFuture();
570 for (size_t i = 0; first != last; ++first, ++i) {
573 f.setCallback_([ctx, i, n](Try<T> t) {
574 ctx->results[i] = std::move(t);
575 if (++ctx->count == n) {
576 ctx->p.setValue(std::move(ctx->results));
587 template <typename T>
588 struct CollectContext {
589 explicit CollectContext(int n) : count(0), threw(false) {}
591 Promise<std::vector<T>> p;
592 std::vector<T> results;
593 std::atomic<size_t> count;
594 std::atomic_bool threw;
596 typedef std::vector<T> result_type;
598 static inline Future<std::vector<T>> makeEmptyFuture() {
599 return makeFuture(std::vector<T>());
602 inline void setValue() {
603 p.setValue(std::move(results));
606 inline void addResult(int i, Try<T>& t) {
607 results[i] = std::move(t.value());
612 struct CollectContext<void> {
613 explicit CollectContext(int n) : count(0), threw(false) {}
615 std::atomic<size_t> count;
616 std::atomic_bool threw;
618 typedef void result_type;
620 static inline Future<void> makeEmptyFuture() {
624 inline void setValue() {
628 inline void addResult(int i, Try<void>& t) {
635 template <class InputIterator>
636 Future<typename detail::CollectContext<
637 typename std::iterator_traits<InputIterator>::value_type::value_type
639 collect(InputIterator first, InputIterator last) {
641 typename std::iterator_traits<InputIterator>::value_type::value_type T;
644 return detail::CollectContext<T>::makeEmptyFuture();
647 size_t n = std::distance(first, last);
648 auto ctx = new detail::CollectContext<T>(n);
649 auto f_saved = ctx->p.getFuture();
651 for (size_t i = 0; first != last; ++first, ++i) {
654 f.setCallback_([ctx, i, n](Try<T> t) {
655 auto c = ++ctx->count;
657 if (t.hasException()) {
658 if (!ctx->threw.exchange(true)) {
659 ctx->p.setException(std::move(t.exception()));
661 } else if (!ctx->threw) {
662 ctx->addResult(i, t);
677 template <class InputIterator>
682 std::iterator_traits<InputIterator>::value_type::value_type> > >
683 whenAny(InputIterator first, InputIterator last) {
685 typename std::iterator_traits<InputIterator>::value_type::value_type T;
687 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
688 auto f_saved = ctx->p.getFuture();
690 for (size_t i = 0; first != last; first++, i++) {
692 f.setCallback_([i, ctx](Try<T>&& t) {
693 if (!ctx->done.exchange(true)) {
694 ctx->p.setValue(std::make_pair(i, std::move(t)));
703 template <class InputIterator>
704 Future<std::vector<std::pair<size_t, Try<typename
705 std::iterator_traits<InputIterator>::value_type::value_type>>>>
706 whenN(InputIterator first, InputIterator last, size_t n) {
708 std::iterator_traits<InputIterator>::value_type::value_type T;
709 typedef std::vector<std::pair<size_t, Try<T>>> V;
716 auto ctx = std::make_shared<ctx_t>();
719 // for each completed Future, increase count and add to vector, until we
720 // have n completed futures at which point we fulfill our Promise with the
725 it->then([ctx, n, i](Try<T>&& t) {
727 auto c = ++ctx->completed;
729 assert(ctx->v.size() < n);
730 v.push_back(std::make_pair(i, std::move(t)));
732 ctx->p.setTry(Try<V>(std::move(v)));
742 ctx->p.setException(std::runtime_error("Not enough futures"));
745 return ctx->p.getFuture();
748 template <class It, class T, class F, class ItT, class Arg>
749 typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
750 reduce(It first, It last, T initial, F func) {
752 return makeFuture(std::move(initial));
755 typedef isTry<Arg> IsTry;
757 return whenAll(first, last)
758 .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
759 for (auto& val : vals) {
760 initial = func(std::move(initial),
761 // Either return a ItT&& or a Try<ItT>&& depending
762 // on the type of the argument of func.
763 val.template get<IsTry::value, Arg&&>());
769 template <class It, class T, class F, class ItT, class Arg>
770 typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
771 reduce(It first, It last, T initial, F func) {
773 return makeFuture(std::move(initial));
776 typedef isTry<Arg> IsTry;
778 auto f = first->then([initial, func](Try<ItT>& head) mutable {
779 return func(std::move(initial),
780 head.template get<IsTry::value, Arg&&>());
783 for (++first; first != last; ++first) {
784 f = whenAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
785 return func(std::move(std::get<0>(t).value()),
786 // Either return a ItT&& or a Try<ItT>&& depending
787 // on the type of the argument of func.
788 std::get<1>(t).template get<IsTry::value, Arg&&>());
796 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
797 return within(dur, TimedOut(), tk);
802 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
805 Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
808 std::atomic<bool> token;
810 auto ctx = std::make_shared<Context>(std::move(e));
813 tk = folly::detail::getTimekeeperSingleton();
817 .then([ctx](Try<void> const& t) {
818 if (ctx->token.exchange(true) == false) {
819 if (t.hasException()) {
820 ctx->promise.setException(std::move(t.exception()));
822 ctx->promise.setException(std::move(ctx->exception));
827 this->then([ctx](Try<T>&& t) {
828 if (ctx->token.exchange(true) == false) {
829 ctx->promise.setTry(std::move(t));
833 return ctx->promise.getFuture();
837 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
838 return whenAll(*this, futures::sleep(dur, tk))
839 .then([](std::tuple<Try<T>, Try<void>> tup) {
840 Try<T>& t = std::get<0>(tup);
841 return makeFuture<T>(std::move(t));
848 void waitImpl(Future<T>& f) {
849 // short-circuit if there's nothing to do
850 if (f.isReady()) return;
853 f = f.then([&](Try<T> t) {
855 return makeFuture(std::move(t));
859 // There's a race here between the return here and the actual finishing of
860 // the future. f is completed, but the setup may not have finished on done
861 // after the baton has posted.
862 while (!f.isReady()) {
863 std::this_thread::yield();
868 void waitImpl(Future<T>& f, Duration dur) {
869 // short-circuit if there's nothing to do
870 if (f.isReady()) return;
872 auto baton = std::make_shared<Baton<>>();
873 f = f.then([baton](Try<T> t) {
875 return makeFuture(std::move(t));
878 // Let's preserve the invariant that if we did not timeout (timed_wait returns
879 // true), then the returned Future is complete when it is returned to the
880 // caller. We need to wait out the race for that Future to complete.
881 if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
882 while (!f.isReady()) {
883 std::this_thread::yield();
889 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
890 while (!f.isReady()) {
898 Future<T>& Future<T>::wait() & {
899 detail::waitImpl(*this);
904 Future<T>&& Future<T>::wait() && {
905 detail::waitImpl(*this);
906 return std::move(*this);
910 Future<T>& Future<T>::wait(Duration dur) & {
911 detail::waitImpl(*this, dur);
916 Future<T>&& Future<T>::wait(Duration dur) && {
917 detail::waitImpl(*this, dur);
918 return std::move(*this);
922 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
923 detail::waitViaImpl(*this, e);
928 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
929 detail::waitViaImpl(*this, e);
930 return std::move(*this);
935 return std::move(wait().value());
939 inline void Future<void>::get() {
944 T Future<T>::get(Duration dur) {
947 return std::move(value());
954 inline void Future<void>::get(Duration dur) {
964 T Future<T>::getVia(DrivableExecutor* e) {
965 return std::move(waitVia(e).value());
969 inline void Future<void>::getVia(DrivableExecutor* e) {
974 Future<bool> Future<T>::willEqual(Future<T>& f) {
975 return whenAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
976 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
977 return std::get<0>(t).value() == std::get<1>(t).value();
986 Future<T> Future<T>::filter(F predicate) {
987 auto p = folly::makeMoveWrapper(std::move(predicate));
988 return this->then([p](T val) {
989 T const& valConstRef = val;
990 if (!(*p)(valConstRef)) {
991 throw PredicateDoesNotObtain();
1000 Future<Z> chainHelper(Future<Z> f) {
1004 template <class Z, class F, class Fn, class... Callbacks>
1005 Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
1006 return chainHelper<Z>(f.then(fn), fns...);
1010 template <class A, class Z, class... Callbacks>
1011 std::function<Future<Z>(Try<A>)>
1012 chain(Callbacks... fns) {
1013 MoveWrapper<Promise<A>> pw;
1014 MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
1015 return [=](Try<A> t) mutable {
1016 pw->setTry(std::move(t));
1017 return std::move(*fw);
1023 } // namespace folly
1025 // I haven't included a Future<T&> specialization because I don't forsee us
1026 // using it, however it is not difficult to add when needed. Refer to
1027 // Future<void> for guidance. std::future and boost::future code would also be