2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
71 template <typename T2>
72 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
73 : core_(new detail::Core<T>(Try<T>(T()))) {}
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::forward<F>(func));
105 typename std::enable_if<isFuture<F>::value,
106 Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108 return then([](Future<typename isFuture<T>::Inner> internal_future) {
109 return internal_future;
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
121 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122 typedef typename R::ReturnsFuture::Inner B;
127 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
129 // grab the Future now before we lose our handle on the Promise
130 auto f = p.getFuture();
131 f.core_->setExecutorNoLock(getExecutor());
133 /* This is a bit tricky.
135 We can't just close over *this in case this Future gets moved. So we
136 make a new dummy Future. We could figure out something more
137 sophisticated that avoids making a new Future object when it can, as an
138 optimization. But this is correct.
140 core_ can't be moved, it is explicitly disallowed (as is copying). But
141 if there's ever a reason to allow it, this is one place that makes that
142 assumption and would need to be fixed. We use a standard shared pointer
143 for core_ (by copying it in), which means in essence obj holds a shared
144 pointer to itself. But this shouldn't leak because Promise will not
145 outlive the continuation, because Promise will setException() with a
146 broken Promise if it is destructed before completed. We could use a
147 weak pointer but it would have to be converted to a shared pointer when
148 func is executed (because the Future returned by func may possibly
149 persist beyond the callback, if it gets moved), and so it is an
150 optimization to just make it shared from the get-go.
152 Two subtle but important points about this design. detail::Core has no
153 back pointers to Future or Promise, so if Future or Promise get moved
154 (and they will be moved in performant code) we don't have to do
155 anything fancy. And because we store the continuation in the
156 detail::Core, not in the Future, we can execute the continuation even
157 after the Future has gone out of scope. This is an intentional design
158 decision. It is likely we will want to be able to cancel a continuation
159 in some circumstances, but I think it should be explicit not implicit
160 in the destruction of the Future used to create it.
163 [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
164 if (!isTry && t.hasException()) {
165 pm.setException(std::move(t.exception()));
168 return std::move(func)(t.template get<isTry, Args>()...);
176 // Variant: returns a Future
177 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
179 template <typename F, typename R, bool isTry, typename... Args>
180 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
181 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
182 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
183 typedef typename R::ReturnsFuture::Inner B;
188 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
190 // grab the Future now before we lose our handle on the Promise
191 auto f = p.getFuture();
192 f.core_->setExecutorNoLock(getExecutor());
194 setCallback_([ func = std::forward<F>(func), pm = std::move(p) ](
195 Try<T> && t) mutable {
197 if (!isTry && t.hasException()) {
198 return std::move(t.exception());
201 auto f2 = std::move(func)(t.template get<isTry, Args>()...);
202 // that didn't throw, now we can steal p
203 f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
204 p.setTry(std::move(b));
206 return exception_wrapper();
207 } catch (const std::exception& e) {
208 return exception_wrapper(std::current_exception(), e);
210 return exception_wrapper(std::current_exception());
215 pm.setException(std::move(ew));
222 template <typename T>
223 template <typename R, typename Caller, typename... Args>
224 Future<typename isFuture<R>::Inner>
225 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
226 typedef typename std::remove_cv<
227 typename std::remove_reference<
228 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
229 return then([instance, func](Try<T>&& t){
230 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
235 template <class Executor, class Arg, class... Args>
236 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
237 -> decltype(this->then(std::forward<Arg>(arg),
238 std::forward<Args>(args)...))
240 auto oldX = getExecutor();
242 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
247 Future<Unit> Future<T>::then() {
248 return then([] () {});
251 // onError where the callback returns T
254 typename std::enable_if<
255 !detail::callableWith<F, exception_wrapper>::value &&
256 !detail::Extract<F>::ReturnsFuture::value,
258 Future<T>::onError(F&& func) {
259 typedef typename detail::Extract<F>::FirstArg Exn;
261 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
262 "Return type of onError callback must be T or Future<T>");
265 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
266 auto f = p.getFuture();
269 [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
270 if (!t.template withException<Exn>([&](Exn& e) {
271 pm.setWith([&] { return std::move(func)(e); });
273 pm.setTry(std::move(t));
280 // onError where the callback returns Future<T>
283 typename std::enable_if<
284 !detail::callableWith<F, exception_wrapper>::value &&
285 detail::Extract<F>::ReturnsFuture::value,
287 Future<T>::onError(F&& func) {
289 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
290 "Return type of onError callback must be T or Future<T>");
291 typedef typename detail::Extract<F>::FirstArg Exn;
294 auto f = p.getFuture();
296 setCallback_([ pm = std::move(p), func = std::forward<F>(func) ](
297 Try<T> && t) mutable {
298 if (!t.template withException<Exn>([&](Exn& e) {
301 auto f2 = std::move(func)(e);
302 f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
303 pm.setTry(std::move(t2));
305 return exception_wrapper();
306 } catch (const std::exception& e2) {
307 return exception_wrapper(std::current_exception(), e2);
309 return exception_wrapper(std::current_exception());
313 pm.setException(std::move(ew));
316 pm.setTry(std::move(t));
325 Future<T> Future<T>::ensure(F&& func) {
326 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
328 return makeFuture(std::move(t));
334 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
335 return within(dur, tk).onError([funcw = std::forward<F>(func)](
336 TimedOut const&) { return funcw(); });
341 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
342 detail::Extract<F>::ReturnsFuture::value,
344 Future<T>::onError(F&& func) {
346 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
347 "Return type of onError callback must be T or Future<T>");
350 auto f = p.getFuture();
352 [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
353 if (t.hasException()) {
356 auto f2 = std::move(func)(std::move(t.exception()));
357 f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
358 pm.setTry(std::move(t2));
360 return exception_wrapper();
361 } catch (const std::exception& e2) {
362 return exception_wrapper(std::current_exception(), e2);
364 return exception_wrapper(std::current_exception());
368 pm.setException(std::move(ew));
371 pm.setTry(std::move(t));
378 // onError(exception_wrapper) that returns T
381 typename std::enable_if<
382 detail::callableWith<F, exception_wrapper>::value &&
383 !detail::Extract<F>::ReturnsFuture::value,
385 Future<T>::onError(F&& func) {
387 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
388 "Return type of onError callback must be T or Future<T>");
391 auto f = p.getFuture();
393 [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
394 if (t.hasException()) {
395 pm.setWith([&] { return std::move(func)(std::move(t.exception())); });
397 pm.setTry(std::move(t));
405 typename std::add_lvalue_reference<T>::type Future<T>::value() {
408 return core_->getTry().value();
412 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
415 return core_->getTry().value();
419 Try<T>& Future<T>::getTry() {
422 return core_->getTry();
426 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
427 return waitVia(e).getTry();
431 Optional<Try<T>> Future<T>::poll() {
433 if (core_->ready()) {
434 o = std::move(core_->getTry());
440 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
443 setExecutor(executor, priority);
445 return std::move(*this);
449 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
453 auto f = p.getFuture();
454 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
455 return std::move(f).via(executor, priority);
458 template <class Func>
459 auto via(Executor* x, Func&& func)
460 -> Future<typename isFuture<decltype(func())>::Inner>
462 // TODO make this actually more performant. :-P #7260175
463 return via(x).then(std::forward<Func>(func));
467 bool Future<T>::isReady() const {
469 return core_->ready();
473 bool Future<T>::hasValue() {
474 return getTry().hasValue();
478 bool Future<T>::hasException() {
479 return getTry().hasException();
483 void Future<T>::raise(exception_wrapper exception) {
484 core_->raise(std::move(exception));
490 Future<typename std::decay<T>::type> makeFuture(T&& t) {
491 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
494 inline // for multiple translation units
495 Future<Unit> makeFuture() {
496 return makeFuture(Unit{});
499 // makeFutureWith(Future<T>()) -> Future<T>
501 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
502 typename std::result_of<F()>::type>::type
503 makeFutureWith(F&& func) {
505 typename isFuture<typename std::result_of<F()>::type>::Inner;
508 } catch (std::exception& e) {
509 return makeFuture<InnerType>(
510 exception_wrapper(std::current_exception(), e));
512 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
516 // makeFutureWith(T()) -> Future<T>
517 // makeFutureWith(void()) -> Future<Unit>
519 typename std::enable_if<
520 !(isFuture<typename std::result_of<F()>::type>::value),
521 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
522 makeFutureWith(F&& func) {
524 typename Unit::Lift<typename std::result_of<F()>::type>::type;
525 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
531 Future<T> makeFuture(std::exception_ptr const& e) {
532 return makeFuture(Try<T>(e));
536 Future<T> makeFuture(exception_wrapper ew) {
537 return makeFuture(Try<T>(std::move(ew)));
540 template <class T, class E>
541 typename std::enable_if<std::is_base_of<std::exception, E>::value,
543 makeFuture(E const& e) {
544 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
548 Future<T> makeFuture(Try<T>&& t) {
549 return Future<T>(new detail::Core<T>(std::move(t)));
553 Future<Unit> via(Executor* executor, int8_t priority) {
554 return makeFuture().via(executor, priority);
557 // mapSetCallback calls func(i, Try<T>) when every future completes
559 template <class T, class InputIterator, class F>
560 void mapSetCallback(InputIterator first, InputIterator last, F func) {
561 for (size_t i = 0; first != last; ++first, ++i) {
562 first->setCallback_([func, i](Try<T>&& t) {
563 func(i, std::move(t));
568 // collectAll (variadic)
570 template <typename... Fs>
571 typename detail::CollectAllVariadicContext<
572 typename std::decay<Fs>::type::value_type...>::type
573 collectAll(Fs&&... fs) {
574 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
575 typename std::decay<Fs>::type::value_type...>>();
576 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
577 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
578 return ctx->p.getFuture();
581 // collectAll (iterator)
583 template <class InputIterator>
586 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
587 collectAll(InputIterator first, InputIterator last) {
589 typename std::iterator_traits<InputIterator>::value_type::value_type T;
591 struct CollectAllContext {
592 CollectAllContext(size_t n) : results(n) {}
593 ~CollectAllContext() {
594 p.setValue(std::move(results));
596 Promise<std::vector<Try<T>>> p;
597 std::vector<Try<T>> results;
601 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
602 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
603 ctx->results[i] = std::move(t);
605 return ctx->p.getFuture();
608 // collect (iterator)
612 template <typename T>
613 struct CollectContext {
615 explicit Nothing(int /* n */) {}
618 using Result = typename std::conditional<
619 std::is_void<T>::value,
621 std::vector<T>>::type;
623 using InternalResult = typename std::conditional<
624 std::is_void<T>::value,
626 std::vector<Optional<T>>>::type;
628 explicit CollectContext(size_t n) : result(n) {}
630 if (!threw.exchange(true)) {
631 // map Optional<T> -> T
632 std::vector<T> finalResult;
633 finalResult.reserve(result.size());
634 std::transform(result.begin(), result.end(),
635 std::back_inserter(finalResult),
636 [](Optional<T>& o) { return std::move(o.value()); });
637 p.setValue(std::move(finalResult));
640 inline void setPartialResult(size_t i, Try<T>& t) {
641 result[i] = std::move(t.value());
644 InternalResult result;
645 std::atomic<bool> threw {false};
650 template <class InputIterator>
651 Future<typename detail::CollectContext<
652 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
653 collect(InputIterator first, InputIterator last) {
655 typename std::iterator_traits<InputIterator>::value_type::value_type T;
657 auto ctx = std::make_shared<detail::CollectContext<T>>(
658 std::distance(first, last));
659 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
660 if (t.hasException()) {
661 if (!ctx->threw.exchange(true)) {
662 ctx->p.setException(std::move(t.exception()));
664 } else if (!ctx->threw) {
665 ctx->setPartialResult(i, t);
668 return ctx->p.getFuture();
671 // collect (variadic)
673 template <typename... Fs>
674 typename detail::CollectVariadicContext<
675 typename std::decay<Fs>::type::value_type...>::type
676 collect(Fs&&... fs) {
677 auto ctx = std::make_shared<detail::CollectVariadicContext<
678 typename std::decay<Fs>::type::value_type...>>();
679 detail::collectVariadicHelper<detail::CollectVariadicContext>(
680 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
681 return ctx->p.getFuture();
684 // collectAny (iterator)
686 template <class InputIterator>
691 std::iterator_traits<InputIterator>::value_type::value_type>>>
692 collectAny(InputIterator first, InputIterator last) {
694 typename std::iterator_traits<InputIterator>::value_type::value_type T;
696 struct CollectAnyContext {
697 CollectAnyContext() {}
698 Promise<std::pair<size_t, Try<T>>> p;
699 std::atomic<bool> done {false};
702 auto ctx = std::make_shared<CollectAnyContext>();
703 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
704 if (!ctx->done.exchange(true)) {
705 ctx->p.setValue(std::make_pair(i, std::move(t)));
708 return ctx->p.getFuture();
711 // collectAnyWithoutException (iterator)
713 template <class InputIterator>
716 typename std::iterator_traits<InputIterator>::value_type::value_type>>
717 collectAnyWithoutException(InputIterator first, InputIterator last) {
719 typename std::iterator_traits<InputIterator>::value_type::value_type T;
721 struct CollectAnyWithoutExceptionContext {
722 CollectAnyWithoutExceptionContext(){}
723 Promise<std::pair<size_t, T>> p;
724 std::atomic<bool> done{false};
725 std::atomic<size_t> nFulfilled{0};
729 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
730 ctx->nTotal = size_t(std::distance(first, last));
732 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
733 if (!t.hasException() && !ctx->done.exchange(true)) {
734 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
735 } else if (++ctx->nFulfilled == ctx->nTotal) {
736 ctx->p.setException(t.exception());
739 return ctx->p.getFuture();
742 // collectN (iterator)
744 template <class InputIterator>
745 Future<std::vector<std::pair<size_t, Try<typename
746 std::iterator_traits<InputIterator>::value_type::value_type>>>>
747 collectN(InputIterator first, InputIterator last, size_t n) {
749 std::iterator_traits<InputIterator>::value_type::value_type T;
750 typedef std::vector<std::pair<size_t, Try<T>>> V;
752 struct CollectNContext {
754 std::atomic<size_t> completed = {0};
757 auto ctx = std::make_shared<CollectNContext>();
759 if (size_t(std::distance(first, last)) < n) {
760 ctx->p.setException(std::runtime_error("Not enough futures"));
762 // for each completed Future, increase count and add to vector, until we
763 // have n completed futures at which point we fulfil our Promise with the
765 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
766 auto c = ++ctx->completed;
768 assert(ctx->v.size() < n);
769 ctx->v.emplace_back(i, std::move(t));
771 ctx->p.setTry(Try<V>(std::move(ctx->v)));
777 return ctx->p.getFuture();
782 template <class It, class T, class F>
783 Future<T> reduce(It first, It last, T&& initial, F&& func) {
785 return makeFuture(std::move(initial));
788 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
790 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
793 typedef isTry<Arg> IsTry;
795 auto sfunc = std::make_shared<F>(std::move(func));
797 auto f = first->then(
798 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
800 std::move(minitial), head.template get<IsTry::value, Arg&&>());
803 for (++first; first != last; ++first) {
804 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
805 return (*sfunc)(std::move(std::get<0>(t).value()),
806 // Either return a ItT&& or a Try<ItT>&& depending
807 // on the type of the argument of func.
808 std::get<1>(t).template get<IsTry::value, Arg&&>());
815 // window (collection)
817 template <class Collection, class F, class ItT, class Result>
818 std::vector<Future<Result>>
819 window(Collection input, F func, size_t n) {
820 struct WindowContext {
821 WindowContext(Collection&& i, F&& fn)
822 : input_(std::move(i)), promises_(input_.size()),
825 std::atomic<size_t> i_ {0};
827 std::vector<Promise<Result>> promises_;
830 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
831 size_t i = ctx->i_++;
832 if (i < ctx->input_.size()) {
833 // Using setCallback_ directly since we don't need the Future
834 ctx->func_(std::move(ctx->input_[i])).setCallback_(
835 // ctx is captured by value
836 [ctx, i](Try<Result>&& t) {
837 ctx->promises_[i].setTry(std::move(t));
838 // Chain another future onto this one
839 spawn(std::move(ctx));
845 auto max = std::min(n, input.size());
847 auto ctx = std::make_shared<WindowContext>(
848 std::move(input), std::move(func));
850 for (size_t i = 0; i < max; ++i) {
851 // Start the first n Futures
852 WindowContext::spawn(ctx);
855 std::vector<Future<Result>> futures;
856 futures.reserve(ctx->promises_.size());
857 for (auto& promise : ctx->promises_) {
858 futures.emplace_back(promise.getFuture());
867 template <class I, class F>
868 Future<I> Future<T>::reduce(I&& initial, F&& func) {
870 minitial = std::forward<I>(initial),
871 mfunc = std::forward<F>(func)
873 auto ret = std::move(minitial);
874 for (auto& val : vals) {
875 ret = mfunc(std::move(ret), std::move(val));
881 // unorderedReduce (iterator)
883 template <class It, class T, class F, class ItT, class Arg>
884 Future<T> unorderedReduce(It first, It last, T initial, F func) {
886 return makeFuture(std::move(initial));
889 typedef isTry<Arg> IsTry;
891 struct UnorderedReduceContext {
892 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
893 : lock_(), memo_(makeFuture<T>(std::move(memo))),
894 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
896 folly::MicroSpinLock lock_; // protects memo_ and numThens_
899 size_t numThens_; // how many Futures completed and called .then()
900 size_t numFutures_; // how many Futures in total
904 auto ctx = std::make_shared<UnorderedReduceContext>(
905 std::move(initial), std::move(func), std::distance(first, last));
910 [ctx](size_t /* i */, Try<ItT>&& t) {
911 // Futures can be completed in any order, simultaneously.
912 // To make this non-blocking, we create a new Future chain in
913 // the order of completion to reduce the values.
914 // The spinlock just protects chaining a new Future, not actually
915 // executing the reduce, which should be really fast.
916 folly::MSLGuard lock(ctx->lock_);
918 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
919 // Either return a ItT&& or a Try<ItT>&& depending
920 // on the type of the argument of func.
921 return ctx->func_(std::move(v),
922 mt.template get<IsTry::value, Arg&&>());
924 if (++ctx->numThens_ == ctx->numFutures_) {
925 // After reducing the value of the last Future, fulfill the Promise
926 ctx->memo_.setCallback_(
927 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
931 return ctx->promise_.getFuture();
937 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
938 return within(dur, TimedOut(), tk);
943 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
946 Context(E ex) : exception(std::move(ex)), promise() {}
948 Future<Unit> thisFuture;
950 std::atomic<bool> token {false};
953 std::shared_ptr<Timekeeper> tks;
955 tks = folly::detail::getTimekeeperSingleton();
956 tk = DCHECK_NOTNULL(tks.get());
959 auto ctx = std::make_shared<Context>(std::move(e));
961 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
962 // TODO: "this" completed first, cancel "after"
963 if (ctx->token.exchange(true) == false) {
964 ctx->promise.setTry(std::move(t));
968 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
969 // "after" completed first, cancel "this"
970 ctx->thisFuture.raise(TimedOut());
971 if (ctx->token.exchange(true) == false) {
972 if (t.hasException()) {
973 ctx->promise.setException(std::move(t.exception()));
975 ctx->promise.setException(std::move(ctx->exception));
980 return ctx->promise.getFuture().via(getExecutor());
986 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
987 return collectAll(*this, futures::sleep(dur, tk))
988 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
989 Try<T>& t = std::get<0>(tup);
990 return makeFuture<T>(std::move(t));
997 void waitImpl(Future<T>& f) {
998 // short-circuit if there's nothing to do
999 if (f.isReady()) return;
1001 FutureBatonType baton;
1002 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1004 assert(f.isReady());
1008 void waitImpl(Future<T>& f, Duration dur) {
1009 // short-circuit if there's nothing to do
1015 auto ret = promise.getFuture();
1016 auto baton = std::make_shared<FutureBatonType>();
1017 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1018 promise.setTry(std::move(t));
1022 if (baton->timed_wait(dur)) {
1023 assert(f.isReady());
1028 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1029 // Set callback so to ensure that the via executor has something on it
1030 // so that once the preceding future triggers this callback, drive will
1031 // always have a callback to satisfy it
1034 f = f.via(e).then([](T&& t) { return std::move(t); });
1035 while (!f.isReady()) {
1038 assert(f.isReady());
1044 Future<T>& Future<T>::wait() & {
1045 detail::waitImpl(*this);
1050 Future<T>&& Future<T>::wait() && {
1051 detail::waitImpl(*this);
1052 return std::move(*this);
1056 Future<T>& Future<T>::wait(Duration dur) & {
1057 detail::waitImpl(*this, dur);
1062 Future<T>&& Future<T>::wait(Duration dur) && {
1063 detail::waitImpl(*this, dur);
1064 return std::move(*this);
1068 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1069 detail::waitViaImpl(*this, e);
1074 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1075 detail::waitViaImpl(*this, e);
1076 return std::move(*this);
1080 T Future<T>::get() {
1081 return std::move(wait().value());
1085 T Future<T>::get(Duration dur) {
1088 return std::move(value());
1095 T Future<T>::getVia(DrivableExecutor* e) {
1096 return std::move(waitVia(e).value());
1102 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1103 return t1.value() == t2.value();
1109 Future<bool> Future<T>::willEqual(Future<T>& f) {
1110 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1111 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1112 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1121 Future<T> Future<T>::filter(F&& predicate) {
1122 return this->then([p = std::forward<F>(predicate)](T val) {
1123 T const& valConstRef = val;
1124 if (!p(valConstRef)) {
1125 throw PredicateDoesNotObtain();
1132 template <class Callback>
1133 auto Future<T>::thenMulti(Callback&& fn)
1134 -> decltype(this->then(std::forward<Callback>(fn))) {
1135 // thenMulti with one callback is just a then
1136 return then(std::forward<Callback>(fn));
1140 template <class Callback, class... Callbacks>
1141 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1142 -> decltype(this->then(std::forward<Callback>(fn)).
1143 thenMulti(std::forward<Callbacks>(fns)...)) {
1144 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1145 return then(std::forward<Callback>(fn)).
1146 thenMulti(std::forward<Callbacks>(fns)...);
1150 template <class Callback, class... Callbacks>
1151 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1153 -> decltype(this->then(std::forward<Callback>(fn)).
1154 thenMulti(std::forward<Callbacks>(fns)...)) {
1155 // thenMultiExecutor with two callbacks is
1156 // via(x).then(a).thenMulti(b, ...).via(oldX)
1157 auto oldX = getExecutor();
1159 return then(std::forward<Callback>(fn)).
1160 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1164 template <class Callback>
1165 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1166 -> decltype(this->then(std::forward<Callback>(fn))) {
1167 // thenMulti with one callback is just a then with an executor
1168 return then(x, std::forward<Callback>(fn));
1172 inline Future<Unit> when(bool p, F&& thunk) {
1173 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1176 template <class P, class F>
1177 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1179 auto future = thunk();
1180 return future.then([
1181 predicate = std::forward<P>(predicate),
1182 thunk = std::forward<F>(thunk)
1184 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1187 return makeFuture();
1191 Future<Unit> times(const int n, F&& thunk) {
1192 return folly::whileDo(
1193 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1194 return count->fetch_add(1) < n;
1196 std::forward<F>(thunk));
1200 template <class It, class F, class ItT, class Result>
1201 std::vector<Future<Result>> map(It first, It last, F func) {
1202 std::vector<Future<Result>> results;
1203 for (auto it = first; it != last; it++) {
1204 results.push_back(it->then(func));
1214 struct retrying_policy_raw_tag {};
1215 struct retrying_policy_fut_tag {};
1217 template <class Policy>
1218 struct retrying_policy_traits {
1219 using ew = exception_wrapper;
1220 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1221 template <class Ret>
1222 using has_op = typename std::integral_constant<bool,
1223 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1224 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1225 using is_raw = has_op<bool>;
1226 using is_fut = has_op<Future<bool>>;
1227 using tag = typename std::conditional<
1228 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1229 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1232 template <class Policy, class FF, class Prom>
1233 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1234 using F = typename std::result_of<FF(size_t)>::type;
1235 using T = typename F::value_type;
1239 prom = std::move(prom),
1240 pm = std::forward<Policy>(p),
1241 ffm = std::forward<FF>(ff)
1242 ](Try<T> && t) mutable {
1244 prom.setValue(std::move(t).value());
1247 auto& x = t.exception();
1251 prom = std::move(prom),
1254 ffm = std::move(ffm)
1255 ](bool shouldRetry) mutable {
1257 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1259 prom.setException(std::move(xm));
1265 template <class Policy, class FF>
1266 typename std::result_of<FF(size_t)>::type
1267 retrying(size_t k, Policy&& p, FF&& ff) {
1268 using F = typename std::result_of<FF(size_t)>::type;
1269 using T = typename F::value_type;
1270 auto prom = Promise<T>();
1271 auto f = prom.getFuture();
1273 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1277 template <class Policy, class FF>
1278 typename std::result_of<FF(size_t)>::type
1279 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1280 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1281 return makeFuture<bool>(pm(k, x));
1283 return retrying(0, std::move(q), std::forward<FF>(ff));
1286 template <class Policy, class FF>
1287 typename std::result_of<FF(size_t)>::type
1288 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1289 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1292 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1293 template <class URNG>
1294 Duration retryingJitteredExponentialBackoffDur(
1296 Duration backoff_min,
1297 Duration backoff_max,
1298 double jitter_param,
1301 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1302 auto jitter = std::exp(dist(rng));
1303 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1304 return std::max(backoff_min, std::min(backoff_max, backoff));
1307 template <class Policy, class URNG>
1308 std::function<Future<bool>(size_t, const exception_wrapper&)>
1309 retryingPolicyCappedJitteredExponentialBackoff(
1311 Duration backoff_min,
1312 Duration backoff_max,
1313 double jitter_param,
1317 pm = std::forward<Policy>(p),
1322 rngp = std::forward<URNG>(rng)
1323 ](size_t n, const exception_wrapper& ex) mutable {
1324 if (n == max_tries) {
1325 return makeFuture(false);
1327 return pm(n, ex).then(
1328 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1331 return makeFuture(false);
1333 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1334 n, backoff_min, backoff_max, jitter_param, rngp);
1335 return futures::sleep(backoff).then([] { return true; });
1340 template <class Policy, class URNG>
1341 std::function<Future<bool>(size_t, const exception_wrapper&)>
1342 retryingPolicyCappedJitteredExponentialBackoff(
1344 Duration backoff_min,
1345 Duration backoff_max,
1346 double jitter_param,
1349 retrying_policy_raw_tag) {
1350 auto q = [pm = std::forward<Policy>(p)](
1351 size_t n, const exception_wrapper& e) {
1352 return makeFuture(pm(n, e));
1354 return retryingPolicyCappedJitteredExponentialBackoff(
1359 std::forward<URNG>(rng),
1363 template <class Policy, class URNG>
1364 std::function<Future<bool>(size_t, const exception_wrapper&)>
1365 retryingPolicyCappedJitteredExponentialBackoff(
1367 Duration backoff_min,
1368 Duration backoff_max,
1369 double jitter_param,
1372 retrying_policy_fut_tag) {
1373 return retryingPolicyCappedJitteredExponentialBackoff(
1378 std::forward<URNG>(rng),
1379 std::forward<Policy>(p));
1383 template <class Policy, class FF>
1384 typename std::result_of<FF(size_t)>::type
1385 retrying(Policy&& p, FF&& ff) {
1386 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1387 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1391 std::function<bool(size_t, const exception_wrapper&)>
1392 retryingPolicyBasic(
1394 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1397 template <class Policy, class URNG>
1398 std::function<Future<bool>(size_t, const exception_wrapper&)>
1399 retryingPolicyCappedJitteredExponentialBackoff(
1401 Duration backoff_min,
1402 Duration backoff_max,
1403 double jitter_param,
1406 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1407 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1412 std::forward<URNG>(rng),
1413 std::forward<Policy>(p),
1418 std::function<Future<bool>(size_t, const exception_wrapper&)>
1419 retryingPolicyCappedJitteredExponentialBackoff(
1421 Duration backoff_min,
1422 Duration backoff_max,
1423 double jitter_param) {
1424 auto p = [](size_t, const exception_wrapper&) { return true; };
1425 return retryingPolicyCappedJitteredExponentialBackoff(
1436 // Instantiate the most common Future types to save compile time
1437 extern template class Future<Unit>;
1438 extern template class Future<bool>;
1439 extern template class Future<int>;
1440 extern template class Future<int64_t>;
1441 extern template class Future<std::string>;
1442 extern template class Future<double>;
1444 } // namespace folly