2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
36 Timekeeper* getTimekeeperSingleton();
40 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
41 other.core_ = nullptr;
45 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
46 std::swap(core_, other.core_);
51 template <class T2, typename>
52 Future<T>::Future(T2&& val)
53 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
56 template <typename, typename>
58 : core_(new detail::Core<T>(Try<T>(T()))) {}
61 Future<T>::~Future() {
66 void Future<T>::detach() {
68 core_->detachFuture();
74 void Future<T>::throwIfInvalid() const {
81 void Future<T>::setCallback_(F&& func) {
83 core_->setCallback(std::move(func));
90 typename std::enable_if<isFuture<F>::value,
91 Future<typename isFuture<T>::Inner>>::type
93 return then([](Future<typename isFuture<T>::Inner> internal_future) {
94 return internal_future;
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107 typedef typename R::ReturnsFuture::Inner B;
111 // wrap these so we can move them into the lambda
112 folly::MoveWrapper<Promise<B>> p;
113 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
114 folly::MoveWrapper<F> funcm(std::forward<F>(func));
116 // grab the Future now before we lose our handle on the Promise
117 auto f = p->getFuture();
118 f.core_->setExecutorNoLock(getExecutor());
120 /* This is a bit tricky.
122 We can't just close over *this in case this Future gets moved. So we
123 make a new dummy Future. We could figure out something more
124 sophisticated that avoids making a new Future object when it can, as an
125 optimization. But this is correct.
127 core_ can't be moved, it is explicitly disallowed (as is copying). But
128 if there's ever a reason to allow it, this is one place that makes that
129 assumption and would need to be fixed. We use a standard shared pointer
130 for core_ (by copying it in), which means in essence obj holds a shared
131 pointer to itself. But this shouldn't leak because Promise will not
132 outlive the continuation, because Promise will setException() with a
133 broken Promise if it is destructed before completed. We could use a
134 weak pointer but it would have to be converted to a shared pointer when
135 func is executed (because the Future returned by func may possibly
136 persist beyond the callback, if it gets moved), and so it is an
137 optimization to just make it shared from the get-go.
139 We have to move in the Promise and func using the MoveWrapper
140 hack. (func could be copied but it's a big drag on perf).
142 Two subtle but important points about this design. detail::Core has no
143 back pointers to Future or Promise, so if Future or Promise get moved
144 (and they will be moved in performant code) we don't have to do
145 anything fancy. And because we store the continuation in the
146 detail::Core, not in the Future, we can execute the continuation even
147 after the Future has gone out of scope. This is an intentional design
148 decision. It is likely we will want to be able to cancel a continuation
149 in some circumstances, but I think it should be explicit not implicit
150 in the destruction of the Future used to create it.
153 [p, funcm](Try<T>&& t) mutable {
154 if (!isTry && t.hasException()) {
155 p->setException(std::move(t.exception()));
158 return (*funcm)(t.template get<isTry, Args>()...);
166 // Variant: returns a Future
167 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
169 template <typename F, typename R, bool isTry, typename... Args>
170 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
171 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
172 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
173 typedef typename R::ReturnsFuture::Inner B;
177 // wrap these so we can move them into the lambda
178 folly::MoveWrapper<Promise<B>> p;
179 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
180 folly::MoveWrapper<F> funcm(std::forward<F>(func));
182 // grab the Future now before we lose our handle on the Promise
183 auto f = p->getFuture();
184 f.core_->setExecutorNoLock(getExecutor());
187 [p, funcm](Try<T>&& t) mutable {
188 if (!isTry && t.hasException()) {
189 p->setException(std::move(t.exception()));
192 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
193 // that didn't throw, now we can steal p
194 f2.setCallback_([p](Try<B>&& b) mutable {
195 p->setTry(std::move(b));
197 } catch (const std::exception& e) {
198 p->setException(exception_wrapper(std::current_exception(), e));
200 p->setException(exception_wrapper(std::current_exception()));
208 template <typename T>
209 template <typename R, typename Caller, typename... Args>
210 Future<typename isFuture<R>::Inner>
211 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
212 typedef typename std::remove_cv<
213 typename std::remove_reference<
214 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
215 return then([instance, func](Try<T>&& t){
216 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
221 template <class Executor, class Arg, class... Args>
222 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
223 -> decltype(this->then(std::forward<Arg>(arg),
224 std::forward<Args>(args)...))
226 auto oldX = getExecutor();
228 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
233 Future<Unit> Future<T>::then() {
234 return then([] () {});
237 // onError where the callback returns T
240 typename std::enable_if<
241 !detail::callableWith<F, exception_wrapper>::value &&
242 !detail::Extract<F>::ReturnsFuture::value,
244 Future<T>::onError(F&& func) {
245 typedef typename detail::Extract<F>::FirstArg Exn;
247 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
248 "Return type of onError callback must be T or Future<T>");
251 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
252 auto f = p.getFuture();
253 auto pm = folly::makeMoveWrapper(std::move(p));
254 auto funcm = folly::makeMoveWrapper(std::move(func));
255 setCallback_([pm, funcm](Try<T>&& t) mutable {
256 if (!t.template withException<Exn>([&] (Exn& e) {
261 pm->setTry(std::move(t));
268 // onError where the callback returns Future<T>
271 typename std::enable_if<
272 !detail::callableWith<F, exception_wrapper>::value &&
273 detail::Extract<F>::ReturnsFuture::value,
275 Future<T>::onError(F&& func) {
277 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
278 "Return type of onError callback must be T or Future<T>");
279 typedef typename detail::Extract<F>::FirstArg Exn;
282 auto f = p.getFuture();
283 auto pm = folly::makeMoveWrapper(std::move(p));
284 auto funcm = folly::makeMoveWrapper(std::move(func));
285 setCallback_([pm, funcm](Try<T>&& t) mutable {
286 if (!t.template withException<Exn>([&] (Exn& e) {
288 auto f2 = (*funcm)(e);
289 f2.setCallback_([pm](Try<T>&& t2) mutable {
290 pm->setTry(std::move(t2));
292 } catch (const std::exception& e2) {
293 pm->setException(exception_wrapper(std::current_exception(), e2));
295 pm->setException(exception_wrapper(std::current_exception()));
298 pm->setTry(std::move(t));
307 Future<T> Future<T>::ensure(F func) {
308 MoveWrapper<F> funcw(std::move(func));
309 return this->then([funcw](Try<T>&& t) mutable {
311 return makeFuture(std::move(t));
317 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
318 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
319 return within(dur, tk)
320 .onError([funcw](TimedOut const&) { return (*funcw)(); });
325 typename std::enable_if<
326 detail::callableWith<F, exception_wrapper>::value &&
327 detail::Extract<F>::ReturnsFuture::value,
329 Future<T>::onError(F&& func) {
331 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
332 "Return type of onError callback must be T or Future<T>");
335 auto f = p.getFuture();
336 auto pm = folly::makeMoveWrapper(std::move(p));
337 auto funcm = folly::makeMoveWrapper(std::move(func));
338 setCallback_([pm, funcm](Try<T> t) mutable {
339 if (t.hasException()) {
341 auto f2 = (*funcm)(std::move(t.exception()));
342 f2.setCallback_([pm](Try<T> t2) mutable {
343 pm->setTry(std::move(t2));
345 } catch (const std::exception& e2) {
346 pm->setException(exception_wrapper(std::current_exception(), e2));
348 pm->setException(exception_wrapper(std::current_exception()));
351 pm->setTry(std::move(t));
358 // onError(exception_wrapper) that returns T
361 typename std::enable_if<
362 detail::callableWith<F, exception_wrapper>::value &&
363 !detail::Extract<F>::ReturnsFuture::value,
365 Future<T>::onError(F&& func) {
367 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
368 "Return type of onError callback must be T or Future<T>");
371 auto f = p.getFuture();
372 auto pm = folly::makeMoveWrapper(std::move(p));
373 auto funcm = folly::makeMoveWrapper(std::move(func));
374 setCallback_([pm, funcm](Try<T> t) mutable {
375 if (t.hasException()) {
377 return (*funcm)(std::move(t.exception()));
380 pm->setTry(std::move(t));
388 typename std::add_lvalue_reference<T>::type Future<T>::value() {
391 return core_->getTry().value();
395 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
398 return core_->getTry().value();
402 Try<T>& Future<T>::getTry() {
405 return core_->getTry();
409 Optional<Try<T>> Future<T>::poll() {
411 if (core_->ready()) {
412 o = std::move(core_->getTry());
418 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
421 setExecutor(executor, priority);
423 return std::move(*this);
427 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
430 MoveWrapper<Promise<T>> p;
431 auto f = p->getFuture();
432 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
433 return std::move(f).via(executor, priority);
437 template <class Func>
438 auto via(Executor* x, Func func)
439 -> Future<typename isFuture<decltype(func())>::Inner>
441 // TODO make this actually more performant. :-P #7260175
442 return via(x).then(func);
446 bool Future<T>::isReady() const {
448 return core_->ready();
452 bool Future<T>::hasValue() {
453 return getTry().hasValue();
457 bool Future<T>::hasException() {
458 return getTry().hasException();
462 void Future<T>::raise(exception_wrapper exception) {
463 core_->raise(std::move(exception));
469 Future<typename std::decay<T>::type> makeFuture(T&& t) {
470 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
473 inline // for multiple translation units
474 Future<Unit> makeFuture() {
475 return makeFuture(Unit{});
479 auto makeFutureWith(F&& func)
480 -> Future<typename Unit::Lift<decltype(func())>::type> {
481 using LiftedResult = typename Unit::Lift<decltype(func())>::type;
482 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
488 Future<T> makeFuture(std::exception_ptr const& e) {
489 return makeFuture(Try<T>(e));
493 Future<T> makeFuture(exception_wrapper ew) {
494 return makeFuture(Try<T>(std::move(ew)));
497 template <class T, class E>
498 typename std::enable_if<std::is_base_of<std::exception, E>::value,
500 makeFuture(E const& e) {
501 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
505 Future<T> makeFuture(Try<T>&& t) {
506 return Future<T>(new detail::Core<T>(std::move(t)));
510 Future<Unit> via(Executor* executor, int8_t priority) {
511 return makeFuture().via(executor, priority);
514 // mapSetCallback calls func(i, Try<T>) when every future completes
516 template <class T, class InputIterator, class F>
517 void mapSetCallback(InputIterator first, InputIterator last, F func) {
518 for (size_t i = 0; first != last; ++first, ++i) {
519 first->setCallback_([func, i](Try<T>&& t) {
520 func(i, std::move(t));
525 // collectAll (variadic)
527 template <typename... Fs>
528 typename detail::CollectAllVariadicContext<
529 typename std::decay<Fs>::type::value_type...>::type
530 collectAll(Fs&&... fs) {
531 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
532 typename std::decay<Fs>::type::value_type...>>();
533 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
534 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
535 return ctx->p.getFuture();
538 // collectAll (iterator)
540 template <class InputIterator>
543 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
544 collectAll(InputIterator first, InputIterator last) {
546 typename std::iterator_traits<InputIterator>::value_type::value_type T;
548 struct CollectAllContext {
549 CollectAllContext(int n) : results(n) {}
550 ~CollectAllContext() {
551 p.setValue(std::move(results));
553 Promise<std::vector<Try<T>>> p;
554 std::vector<Try<T>> results;
557 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
558 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
559 ctx->results[i] = std::move(t);
561 return ctx->p.getFuture();
564 // collect (iterator)
568 template <typename T>
569 struct CollectContext {
570 struct Nothing { explicit Nothing(int n) {} };
572 using Result = typename std::conditional<
573 std::is_void<T>::value,
575 std::vector<T>>::type;
577 using InternalResult = typename std::conditional<
578 std::is_void<T>::value,
580 std::vector<Optional<T>>>::type;
582 explicit CollectContext(int n) : result(n) {}
584 if (!threw.exchange(true)) {
585 // map Optional<T> -> T
586 std::vector<T> finalResult;
587 finalResult.reserve(result.size());
588 std::transform(result.begin(), result.end(),
589 std::back_inserter(finalResult),
590 [](Optional<T>& o) { return std::move(o.value()); });
591 p.setValue(std::move(finalResult));
594 inline void setPartialResult(size_t i, Try<T>& t) {
595 result[i] = std::move(t.value());
598 InternalResult result;
599 std::atomic<bool> threw {false};
604 template <class InputIterator>
605 Future<typename detail::CollectContext<
606 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
607 collect(InputIterator first, InputIterator last) {
609 typename std::iterator_traits<InputIterator>::value_type::value_type T;
611 auto ctx = std::make_shared<detail::CollectContext<T>>(
612 std::distance(first, last));
613 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
614 if (t.hasException()) {
615 if (!ctx->threw.exchange(true)) {
616 ctx->p.setException(std::move(t.exception()));
618 } else if (!ctx->threw) {
619 ctx->setPartialResult(i, t);
622 return ctx->p.getFuture();
625 // collect (variadic)
627 template <typename... Fs>
628 typename detail::CollectVariadicContext<
629 typename std::decay<Fs>::type::value_type...>::type
630 collect(Fs&&... fs) {
631 auto ctx = std::make_shared<detail::CollectVariadicContext<
632 typename std::decay<Fs>::type::value_type...>>();
633 detail::collectVariadicHelper<detail::CollectVariadicContext>(
634 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
635 return ctx->p.getFuture();
638 // collectAny (iterator)
640 template <class InputIterator>
645 std::iterator_traits<InputIterator>::value_type::value_type>>>
646 collectAny(InputIterator first, InputIterator last) {
648 typename std::iterator_traits<InputIterator>::value_type::value_type T;
650 struct CollectAnyContext {
651 CollectAnyContext() {};
652 Promise<std::pair<size_t, Try<T>>> p;
653 std::atomic<bool> done {false};
656 auto ctx = std::make_shared<CollectAnyContext>();
657 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
658 if (!ctx->done.exchange(true)) {
659 ctx->p.setValue(std::make_pair(i, std::move(t)));
662 return ctx->p.getFuture();
665 // collectN (iterator)
667 template <class InputIterator>
668 Future<std::vector<std::pair<size_t, Try<typename
669 std::iterator_traits<InputIterator>::value_type::value_type>>>>
670 collectN(InputIterator first, InputIterator last, size_t n) {
672 std::iterator_traits<InputIterator>::value_type::value_type T;
673 typedef std::vector<std::pair<size_t, Try<T>>> V;
675 struct CollectNContext {
677 std::atomic<size_t> completed = {0};
680 auto ctx = std::make_shared<CollectNContext>();
682 if (size_t(std::distance(first, last)) < n) {
683 ctx->p.setException(std::runtime_error("Not enough futures"));
685 // for each completed Future, increase count and add to vector, until we
686 // have n completed futures at which point we fulfil our Promise with the
688 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
689 auto c = ++ctx->completed;
691 assert(ctx->v.size() < n);
692 ctx->v.emplace_back(i, std::move(t));
694 ctx->p.setTry(Try<V>(std::move(ctx->v)));
700 return ctx->p.getFuture();
705 template <class It, class T, class F>
706 Future<T> reduce(It first, It last, T&& initial, F&& func) {
708 return makeFuture(std::move(initial));
711 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
712 typedef typename std::conditional<
713 detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
714 typedef isTry<Arg> IsTry;
716 folly::MoveWrapper<T> minitial(std::move(initial));
717 auto sfunc = std::make_shared<F>(std::move(func));
719 auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
720 return (*sfunc)(std::move(*minitial),
721 head.template get<IsTry::value, Arg&&>());
724 for (++first; first != last; ++first) {
725 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
726 return (*sfunc)(std::move(std::get<0>(t).value()),
727 // Either return a ItT&& or a Try<ItT>&& depending
728 // on the type of the argument of func.
729 std::get<1>(t).template get<IsTry::value, Arg&&>());
736 // window (collection)
738 template <class Collection, class F, class ItT, class Result>
739 std::vector<Future<Result>>
740 window(Collection input, F func, size_t n) {
741 struct WindowContext {
742 WindowContext(Collection&& i, F&& fn)
743 : input_(std::move(i)), promises_(input_.size()),
746 std::atomic<size_t> i_ {0};
748 std::vector<Promise<Result>> promises_;
751 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
752 size_t i = ctx->i_++;
753 if (i < ctx->input_.size()) {
754 // Using setCallback_ directly since we don't need the Future
755 ctx->func_(std::move(ctx->input_[i])).setCallback_(
756 // ctx is captured by value
757 [ctx, i](Try<Result>&& t) {
758 ctx->promises_[i].setTry(std::move(t));
759 // Chain another future onto this one
760 spawn(std::move(ctx));
766 auto max = std::min(n, input.size());
768 auto ctx = std::make_shared<WindowContext>(
769 std::move(input), std::move(func));
771 for (size_t i = 0; i < max; ++i) {
772 // Start the first n Futures
773 WindowContext::spawn(ctx);
776 std::vector<Future<Result>> futures;
777 futures.reserve(ctx->promises_.size());
778 for (auto& promise : ctx->promises_) {
779 futures.emplace_back(promise.getFuture());
788 template <class I, class F>
789 Future<I> Future<T>::reduce(I&& initial, F&& func) {
790 folly::MoveWrapper<I> minitial(std::move(initial));
791 folly::MoveWrapper<F> mfunc(std::move(func));
792 return then([minitial, mfunc](T& vals) mutable {
793 auto ret = std::move(*minitial);
794 for (auto& val : vals) {
795 ret = (*mfunc)(std::move(ret), std::move(val));
801 // unorderedReduce (iterator)
803 template <class It, class T, class F, class ItT, class Arg>
804 Future<T> unorderedReduce(It first, It last, T initial, F func) {
806 return makeFuture(std::move(initial));
809 typedef isTry<Arg> IsTry;
811 struct UnorderedReduceContext {
812 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
813 : lock_(), memo_(makeFuture<T>(std::move(memo))),
814 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
816 folly::MicroSpinLock lock_; // protects memo_ and numThens_
819 size_t numThens_; // how many Futures completed and called .then()
820 size_t numFutures_; // how many Futures in total
824 auto ctx = std::make_shared<UnorderedReduceContext>(
825 std::move(initial), std::move(func), std::distance(first, last));
827 mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
828 folly::MoveWrapper<Try<ItT>> mt(std::move(t));
829 // Futures can be completed in any order, simultaneously.
830 // To make this non-blocking, we create a new Future chain in
831 // the order of completion to reduce the values.
832 // The spinlock just protects chaining a new Future, not actually
833 // executing the reduce, which should be really fast.
834 folly::MSLGuard lock(ctx->lock_);
835 ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
836 // Either return a ItT&& or a Try<ItT>&& depending
837 // on the type of the argument of func.
838 return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
840 if (++ctx->numThens_ == ctx->numFutures_) {
841 // After reducing the value of the last Future, fulfill the Promise
842 ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
843 ctx->promise_.setValue(std::move(t2));
848 return ctx->promise_.getFuture();
854 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
855 return within(dur, TimedOut(), tk);
860 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
863 Context(E ex) : exception(std::move(ex)), promise() {}
865 Future<Unit> thisFuture;
867 std::atomic<bool> token {false};
871 tk = folly::detail::getTimekeeperSingleton();
874 auto ctx = std::make_shared<Context>(std::move(e));
876 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
877 // TODO: "this" completed first, cancel "after"
878 if (ctx->token.exchange(true) == false) {
879 ctx->promise.setTry(std::move(t));
883 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
884 // "after" completed first, cancel "this"
885 ctx->thisFuture.raise(TimedOut());
886 if (ctx->token.exchange(true) == false) {
887 if (t.hasException()) {
888 ctx->promise.setException(std::move(t.exception()));
890 ctx->promise.setException(std::move(ctx->exception));
895 return ctx->promise.getFuture().via(getExecutor());
901 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
902 return collectAll(*this, futures::sleep(dur, tk))
903 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
904 Try<T>& t = std::get<0>(tup);
905 return makeFuture<T>(std::move(t));
912 void waitImpl(Future<T>& f) {
913 // short-circuit if there's nothing to do
914 if (f.isReady()) return;
916 folly::fibers::Baton baton;
917 f = f.then([&](Try<T> t) {
919 return makeFuture(std::move(t));
923 // There's a race here between the return here and the actual finishing of
924 // the future. f is completed, but the setup may not have finished on done
925 // after the baton has posted.
926 while (!f.isReady()) {
927 std::this_thread::yield();
932 void waitImpl(Future<T>& f, Duration dur) {
933 // short-circuit if there's nothing to do
934 if (f.isReady()) return;
936 auto baton = std::make_shared<folly::fibers::Baton>();
937 f = f.then([baton](Try<T> t) {
939 return makeFuture(std::move(t));
942 // Let's preserve the invariant that if we did not timeout (timed_wait returns
943 // true), then the returned Future is complete when it is returned to the
944 // caller. We need to wait out the race for that Future to complete.
945 if (baton->timed_wait(dur)) {
946 while (!f.isReady()) {
947 std::this_thread::yield();
953 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
954 while (!f.isReady()) {
962 Future<T>& Future<T>::wait() & {
963 detail::waitImpl(*this);
968 Future<T>&& Future<T>::wait() && {
969 detail::waitImpl(*this);
970 return std::move(*this);
974 Future<T>& Future<T>::wait(Duration dur) & {
975 detail::waitImpl(*this, dur);
980 Future<T>&& Future<T>::wait(Duration dur) && {
981 detail::waitImpl(*this, dur);
982 return std::move(*this);
986 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
987 detail::waitViaImpl(*this, e);
992 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
993 detail::waitViaImpl(*this, e);
994 return std::move(*this);
999 return std::move(wait().value());
1003 T Future<T>::get(Duration dur) {
1006 return std::move(value());
1013 T Future<T>::getVia(DrivableExecutor* e) {
1014 return std::move(waitVia(e).value());
1020 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1021 return t1.value() == t2.value();
1027 Future<bool> Future<T>::willEqual(Future<T>& f) {
1028 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1029 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1030 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1039 Future<T> Future<T>::filter(F predicate) {
1040 auto p = folly::makeMoveWrapper(std::move(predicate));
1041 return this->then([p](T val) {
1042 T const& valConstRef = val;
1043 if (!(*p)(valConstRef)) {
1044 throw PredicateDoesNotObtain();
1051 template <class Callback>
1052 auto Future<T>::thenMulti(Callback&& fn)
1053 -> decltype(this->then(std::forward<Callback>(fn))) {
1054 // thenMulti with one callback is just a then
1055 return then(std::forward<Callback>(fn));
1059 template <class Callback, class... Callbacks>
1060 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1061 -> decltype(this->then(std::forward<Callback>(fn)).
1062 thenMulti(std::forward<Callbacks>(fns)...)) {
1063 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1064 return then(std::forward<Callback>(fn)).
1065 thenMulti(std::forward<Callbacks>(fns)...);
1069 template <class Callback, class... Callbacks>
1070 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1072 -> decltype(this->then(std::forward<Callback>(fn)).
1073 thenMulti(std::forward<Callbacks>(fns)...)) {
1074 // thenMultiExecutor with two callbacks is
1075 // via(x).then(a).thenMulti(b, ...).via(oldX)
1076 auto oldX = getExecutor();
1078 return then(std::forward<Callback>(fn)).
1079 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1083 template <class Callback>
1084 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1085 -> decltype(this->then(std::forward<Callback>(fn))) {
1086 // thenMulti with one callback is just a then with an executor
1087 return then(x, std::forward<Callback>(fn));
1091 inline Future<Unit> when(bool p, F thunk) {
1092 return p ? thunk().unit() : makeFuture();
1095 template <class P, class F>
1096 Future<Unit> whileDo(P predicate, F thunk) {
1098 return thunk().then([=] {
1099 return whileDo(predicate, thunk);
1102 return makeFuture();
1106 Future<Unit> times(const int n, F thunk) {
1107 auto count = folly::makeMoveWrapper(
1108 std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1110 return folly::whileDo([=]() mutable {
1111 return (*count)->fetch_add(1) < n;
1116 template <class It, class F, class ItT, class Result>
1117 std::vector<Future<Result>> map(It first, It last, F func) {
1118 std::vector<Future<Result>> results;
1119 for (auto it = first; it != last; it++) {
1120 results.push_back(it->then(func));
1130 struct retrying_policy_raw_tag {};
1131 struct retrying_policy_fut_tag {};
1133 template <class Policy>
1134 struct retrying_policy_traits {
1135 using ew = exception_wrapper;
1136 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1137 template <class Ret>
1138 using has_op = typename std::integral_constant<bool,
1139 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1140 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1141 using is_raw = has_op<bool>;
1142 using is_fut = has_op<Future<bool>>;
1143 using tag = typename std::conditional<
1144 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1145 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1148 template <class Policy, class FF>
1149 typename std::result_of<FF(size_t)>::type
1150 retrying(size_t k, Policy&& p, FF&& ff) {
1151 using F = typename std::result_of<FF(size_t)>::type;
1152 using T = typename F::value_type;
1154 auto pm = makeMoveWrapper(p);
1155 auto ffm = makeMoveWrapper(ff);
1156 return f.onError([=](exception_wrapper x) mutable {
1157 auto q = (*pm)(k, x);
1158 auto xm = makeMoveWrapper(std::move(x));
1159 return q.then([=](bool r) mutable {
1161 ? retrying(k, pm.move(), ffm.move())
1162 : makeFuture<T>(xm.move());
1167 template <class Policy, class FF>
1168 typename std::result_of<FF(size_t)>::type
1169 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1170 auto pm = makeMoveWrapper(std::move(p));
1171 auto q = [=](size_t k, exception_wrapper x) {
1172 return makeFuture<bool>((*pm)(k, x));
1174 return retrying(0, std::move(q), std::forward<FF>(ff));
1177 template <class Policy, class FF>
1178 typename std::result_of<FF(size_t)>::type
1179 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1180 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1183 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1184 template <class URNG>
1185 Duration retryingJitteredExponentialBackoffDur(
1187 Duration backoff_min,
1188 Duration backoff_max,
1189 double jitter_param,
1192 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1193 auto jitter = std::exp(dist(rng));
1194 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1195 return std::max(backoff_min, std::min(backoff_max, backoff));
1198 template <class Policy, class URNG>
1199 std::function<Future<bool>(size_t, const exception_wrapper&)>
1200 retryingPolicyCappedJitteredExponentialBackoff(
1202 Duration backoff_min,
1203 Duration backoff_max,
1204 double jitter_param,
1207 auto pm = makeMoveWrapper(std::move(p));
1208 auto rngp = std::make_shared<URNG>(std::move(rng));
1209 return [=](size_t n, const exception_wrapper& ex) mutable {
1210 if (n == max_tries) { return makeFuture(false); }
1211 return (*pm)(n, ex).then([=](bool v) {
1212 if (!v) { return makeFuture(false); }
1213 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1214 n, backoff_min, backoff_max, jitter_param, *rngp);
1215 return futures::sleep(backoff).then([] { return true; });
1220 template <class Policy, class URNG>
1221 std::function<Future<bool>(size_t, const exception_wrapper&)>
1222 retryingPolicyCappedJitteredExponentialBackoff(
1224 Duration backoff_min,
1225 Duration backoff_max,
1226 double jitter_param,
1229 retrying_policy_raw_tag) {
1230 auto pm = makeMoveWrapper(std::move(p));
1231 auto q = [=](size_t n, const exception_wrapper& e) {
1232 return makeFuture((*pm)(n, e));
1234 return retryingPolicyCappedJitteredExponentialBackoff(
1243 template <class Policy, class URNG>
1244 std::function<Future<bool>(size_t, const exception_wrapper&)>
1245 retryingPolicyCappedJitteredExponentialBackoff(
1247 Duration backoff_min,
1248 Duration backoff_max,
1249 double jitter_param,
1252 retrying_policy_fut_tag) {
1253 return retryingPolicyCappedJitteredExponentialBackoff(
1264 template <class Policy, class FF>
1265 typename std::result_of<FF(size_t)>::type
1266 retrying(Policy&& p, FF&& ff) {
1267 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1268 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1272 std::function<bool(size_t, const exception_wrapper&)>
1273 retryingPolicyBasic(
1275 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1278 template <class Policy, class URNG>
1279 std::function<Future<bool>(size_t, const exception_wrapper&)>
1280 retryingPolicyCappedJitteredExponentialBackoff(
1282 Duration backoff_min,
1283 Duration backoff_max,
1284 double jitter_param,
1287 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1288 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1299 std::function<Future<bool>(size_t, const exception_wrapper&)>
1300 retryingPolicyCappedJitteredExponentialBackoff(
1302 Duration backoff_min,
1303 Duration backoff_max,
1304 double jitter_param) {
1305 auto p = [](size_t, const exception_wrapper&) { return true; };
1306 return retryingPolicyCappedJitteredExponentialBackoff(
1317 // Instantiate the most common Future types to save compile time
1318 extern template class Future<Unit>;
1319 extern template class Future<bool>;
1320 extern template class Future<int>;
1321 extern template class Future<int64_t>;
1322 extern template class Future<std::string>;
1323 extern template class Future<double>;
1325 } // namespace folly