2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
71 template <typename T2>
72 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
73 : core_(new detail::Core<T>(Try<T>(T()))) {}
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::forward<F>(func));
105 typename std::enable_if<isFuture<F>::value,
106 Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108 return then([](Future<typename isFuture<T>::Inner> internal_future) {
109 return internal_future;
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
121 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122 typedef typename R::ReturnsFuture::Inner B;
127 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
129 // grab the Future now before we lose our handle on the Promise
130 auto f = p.getFuture();
131 f.core_->setExecutorNoLock(getExecutor());
133 /* This is a bit tricky.
135 We can't just close over *this in case this Future gets moved. So we
136 make a new dummy Future. We could figure out something more
137 sophisticated that avoids making a new Future object when it can, as an
138 optimization. But this is correct.
140 core_ can't be moved, it is explicitly disallowed (as is copying). But
141 if there's ever a reason to allow it, this is one place that makes that
142 assumption and would need to be fixed. We use a standard shared pointer
143 for core_ (by copying it in), which means in essence obj holds a shared
144 pointer to itself. But this shouldn't leak because Promise will not
145 outlive the continuation, because Promise will setException() with a
146 broken Promise if it is destructed before completed. We could use a
147 weak pointer but it would have to be converted to a shared pointer when
148 func is executed (because the Future returned by func may possibly
149 persist beyond the callback, if it gets moved), and so it is an
150 optimization to just make it shared from the get-go.
152 Two subtle but important points about this design. detail::Core has no
153 back pointers to Future or Promise, so if Future or Promise get moved
154 (and they will be moved in performant code) we don't have to do
155 anything fancy. And because we store the continuation in the
156 detail::Core, not in the Future, we can execute the continuation even
157 after the Future has gone out of scope. This is an intentional design
158 decision. It is likely we will want to be able to cancel a continuation
159 in some circumstances, but I think it should be explicit not implicit
160 in the destruction of the Future used to create it.
162 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
163 Try<T> && t) mutable {
164 if (!isTry && t.hasException()) {
165 pm.setException(std::move(t.exception()));
167 pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
174 // Variant: returns a Future
175 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
177 template <typename F, typename R, bool isTry, typename... Args>
178 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
179 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
180 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
181 typedef typename R::ReturnsFuture::Inner B;
186 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
188 // grab the Future now before we lose our handle on the Promise
189 auto f = p.getFuture();
190 f.core_->setExecutorNoLock(getExecutor());
192 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
193 Try<T> && t) mutable {
195 if (!isTry && t.hasException()) {
196 return std::move(t.exception());
199 auto f2 = funcm(t.template get<isTry, Args>()...);
200 // that didn't throw, now we can steal p
201 f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
202 p.setTry(std::move(b));
204 return exception_wrapper();
205 } catch (const std::exception& e) {
206 return exception_wrapper(std::current_exception(), e);
208 return exception_wrapper(std::current_exception());
213 pm.setException(std::move(ew));
220 template <typename T>
221 template <typename R, typename Caller, typename... Args>
222 Future<typename isFuture<R>::Inner>
223 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
224 typedef typename std::remove_cv<
225 typename std::remove_reference<
226 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
227 return then([instance, func](Try<T>&& t){
228 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
233 template <class Executor, class Arg, class... Args>
234 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
235 -> decltype(this->then(std::forward<Arg>(arg),
236 std::forward<Args>(args)...))
238 auto oldX = getExecutor();
240 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
245 Future<Unit> Future<T>::then() {
246 return then([] () {});
249 // onError where the callback returns T
252 typename std::enable_if<
253 !detail::callableWith<F, exception_wrapper>::value &&
254 !detail::Extract<F>::ReturnsFuture::value,
256 Future<T>::onError(F&& func) {
257 typedef typename detail::Extract<F>::FirstArg Exn;
259 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
260 "Return type of onError callback must be T or Future<T>");
263 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
264 auto f = p.getFuture();
266 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
267 Try<T> && t) mutable {
268 if (!t.template withException<Exn>(
269 [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
270 pm.setTry(std::move(t));
277 // onError where the callback returns Future<T>
280 typename std::enable_if<
281 !detail::callableWith<F, exception_wrapper>::value &&
282 detail::Extract<F>::ReturnsFuture::value,
284 Future<T>::onError(F&& func) {
286 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
287 "Return type of onError callback must be T or Future<T>");
288 typedef typename detail::Extract<F>::FirstArg Exn;
291 auto f = p.getFuture();
293 setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
294 Try<T> && t) mutable {
295 if (!t.template withException<Exn>([&](Exn& e) {
299 f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
300 pm.setTry(std::move(t2));
302 return exception_wrapper();
303 } catch (const std::exception& e2) {
304 return exception_wrapper(std::current_exception(), e2);
306 return exception_wrapper(std::current_exception());
310 pm.setException(std::move(ew));
313 pm.setTry(std::move(t));
322 Future<T> Future<T>::ensure(F&& func) {
323 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
325 return makeFuture(std::move(t));
331 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
332 return within(dur, tk).onError([funcw = std::forward<F>(func)](
333 TimedOut const&) { return funcw(); });
338 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
339 detail::Extract<F>::ReturnsFuture::value,
341 Future<T>::onError(F&& func) {
343 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
344 "Return type of onError callback must be T or Future<T>");
347 auto f = p.getFuture();
349 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
350 if (t.hasException()) {
353 auto f2 = funcm(std::move(t.exception()));
354 f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
355 pm.setTry(std::move(t2));
357 return exception_wrapper();
358 } catch (const std::exception& e2) {
359 return exception_wrapper(std::current_exception(), e2);
361 return exception_wrapper(std::current_exception());
365 pm.setException(std::move(ew));
368 pm.setTry(std::move(t));
375 // onError(exception_wrapper) that returns T
378 typename std::enable_if<
379 detail::callableWith<F, exception_wrapper>::value &&
380 !detail::Extract<F>::ReturnsFuture::value,
382 Future<T>::onError(F&& func) {
384 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
385 "Return type of onError callback must be T or Future<T>");
388 auto f = p.getFuture();
390 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
391 if (t.hasException()) {
392 pm.setWith([&] { return funcm(std::move(t.exception())); });
394 pm.setTry(std::move(t));
402 typename std::add_lvalue_reference<T>::type Future<T>::value() {
405 return core_->getTry().value();
409 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
412 return core_->getTry().value();
416 Try<T>& Future<T>::getTry() {
419 return core_->getTry();
423 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
424 return waitVia(e).getTry();
428 Optional<Try<T>> Future<T>::poll() {
430 if (core_->ready()) {
431 o = std::move(core_->getTry());
437 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
440 setExecutor(executor, priority);
442 return std::move(*this);
446 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
450 auto f = p.getFuture();
451 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
452 return std::move(f).via(executor, priority);
455 template <class Func>
456 auto via(Executor* x, Func&& func)
457 -> Future<typename isFuture<decltype(func())>::Inner>
459 // TODO make this actually more performant. :-P #7260175
460 return via(x).then(std::forward<Func>(func));
464 bool Future<T>::isReady() const {
466 return core_->ready();
470 bool Future<T>::hasValue() {
471 return getTry().hasValue();
475 bool Future<T>::hasException() {
476 return getTry().hasException();
480 void Future<T>::raise(exception_wrapper exception) {
481 core_->raise(std::move(exception));
487 Future<typename std::decay<T>::type> makeFuture(T&& t) {
488 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
491 inline // for multiple translation units
492 Future<Unit> makeFuture() {
493 return makeFuture(Unit{});
496 // makeFutureWith(Future<T>()) -> Future<T>
498 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
499 typename std::result_of<F()>::type>::type
500 makeFutureWith(F&& func) {
502 typename isFuture<typename std::result_of<F()>::type>::Inner;
505 } catch (std::exception& e) {
506 return makeFuture<InnerType>(
507 exception_wrapper(std::current_exception(), e));
509 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
513 // makeFutureWith(T()) -> Future<T>
514 // makeFutureWith(void()) -> Future<Unit>
516 typename std::enable_if<
517 !(isFuture<typename std::result_of<F()>::type>::value),
518 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
519 makeFutureWith(F&& func) {
521 typename Unit::Lift<typename std::result_of<F()>::type>::type;
522 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
528 Future<T> makeFuture(std::exception_ptr const& e) {
529 return makeFuture(Try<T>(e));
533 Future<T> makeFuture(exception_wrapper ew) {
534 return makeFuture(Try<T>(std::move(ew)));
537 template <class T, class E>
538 typename std::enable_if<std::is_base_of<std::exception, E>::value,
540 makeFuture(E const& e) {
541 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
545 Future<T> makeFuture(Try<T>&& t) {
546 return Future<T>(new detail::Core<T>(std::move(t)));
550 Future<Unit> via(Executor* executor, int8_t priority) {
551 return makeFuture().via(executor, priority);
554 // mapSetCallback calls func(i, Try<T>) when every future completes
556 template <class T, class InputIterator, class F>
557 void mapSetCallback(InputIterator first, InputIterator last, F func) {
558 for (size_t i = 0; first != last; ++first, ++i) {
559 first->setCallback_([func, i](Try<T>&& t) {
560 func(i, std::move(t));
565 // collectAll (variadic)
567 template <typename... Fs>
568 typename detail::CollectAllVariadicContext<
569 typename std::decay<Fs>::type::value_type...>::type
570 collectAll(Fs&&... fs) {
571 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
572 typename std::decay<Fs>::type::value_type...>>();
573 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
574 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
575 return ctx->p.getFuture();
578 // collectAll (iterator)
580 template <class InputIterator>
583 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
584 collectAll(InputIterator first, InputIterator last) {
586 typename std::iterator_traits<InputIterator>::value_type::value_type T;
588 struct CollectAllContext {
589 CollectAllContext(size_t n) : results(n) {}
590 ~CollectAllContext() {
591 p.setValue(std::move(results));
593 Promise<std::vector<Try<T>>> p;
594 std::vector<Try<T>> results;
598 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
599 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
600 ctx->results[i] = std::move(t);
602 return ctx->p.getFuture();
605 // collect (iterator)
609 template <typename T>
610 struct CollectContext {
612 explicit Nothing(int /* n */) {}
615 using Result = typename std::conditional<
616 std::is_void<T>::value,
618 std::vector<T>>::type;
620 using InternalResult = typename std::conditional<
621 std::is_void<T>::value,
623 std::vector<Optional<T>>>::type;
625 explicit CollectContext(size_t n) : result(n) {}
627 if (!threw.exchange(true)) {
628 // map Optional<T> -> T
629 std::vector<T> finalResult;
630 finalResult.reserve(result.size());
631 std::transform(result.begin(), result.end(),
632 std::back_inserter(finalResult),
633 [](Optional<T>& o) { return std::move(o.value()); });
634 p.setValue(std::move(finalResult));
637 inline void setPartialResult(size_t i, Try<T>& t) {
638 result[i] = std::move(t.value());
641 InternalResult result;
642 std::atomic<bool> threw {false};
647 template <class InputIterator>
648 Future<typename detail::CollectContext<
649 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
650 collect(InputIterator first, InputIterator last) {
652 typename std::iterator_traits<InputIterator>::value_type::value_type T;
654 auto ctx = std::make_shared<detail::CollectContext<T>>(
655 std::distance(first, last));
656 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
657 if (t.hasException()) {
658 if (!ctx->threw.exchange(true)) {
659 ctx->p.setException(std::move(t.exception()));
661 } else if (!ctx->threw) {
662 ctx->setPartialResult(i, t);
665 return ctx->p.getFuture();
668 // collect (variadic)
670 template <typename... Fs>
671 typename detail::CollectVariadicContext<
672 typename std::decay<Fs>::type::value_type...>::type
673 collect(Fs&&... fs) {
674 auto ctx = std::make_shared<detail::CollectVariadicContext<
675 typename std::decay<Fs>::type::value_type...>>();
676 detail::collectVariadicHelper<detail::CollectVariadicContext>(
677 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
678 return ctx->p.getFuture();
681 // collectAny (iterator)
683 template <class InputIterator>
688 std::iterator_traits<InputIterator>::value_type::value_type>>>
689 collectAny(InputIterator first, InputIterator last) {
691 typename std::iterator_traits<InputIterator>::value_type::value_type T;
693 struct CollectAnyContext {
694 CollectAnyContext() {}
695 Promise<std::pair<size_t, Try<T>>> p;
696 std::atomic<bool> done {false};
699 auto ctx = std::make_shared<CollectAnyContext>();
700 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
701 if (!ctx->done.exchange(true)) {
702 ctx->p.setValue(std::make_pair(i, std::move(t)));
705 return ctx->p.getFuture();
708 // collectAnyWithoutException (iterator)
710 template <class InputIterator>
713 typename std::iterator_traits<InputIterator>::value_type::value_type>>
714 collectAnyWithoutException(InputIterator first, InputIterator last) {
716 typename std::iterator_traits<InputIterator>::value_type::value_type T;
718 struct CollectAnyWithoutExceptionContext {
719 CollectAnyWithoutExceptionContext(){}
720 Promise<std::pair<size_t, T>> p;
721 std::atomic<bool> done{false};
722 std::atomic<size_t> nFulfilled{0};
726 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
727 ctx->nTotal = size_t(std::distance(first, last));
729 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
730 if (!t.hasException() && !ctx->done.exchange(true)) {
731 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
732 } else if (++ctx->nFulfilled == ctx->nTotal) {
733 ctx->p.setException(t.exception());
736 return ctx->p.getFuture();
739 // collectN (iterator)
741 template <class InputIterator>
742 Future<std::vector<std::pair<size_t, Try<typename
743 std::iterator_traits<InputIterator>::value_type::value_type>>>>
744 collectN(InputIterator first, InputIterator last, size_t n) {
746 std::iterator_traits<InputIterator>::value_type::value_type T;
747 typedef std::vector<std::pair<size_t, Try<T>>> V;
749 struct CollectNContext {
751 std::atomic<size_t> completed = {0};
754 auto ctx = std::make_shared<CollectNContext>();
756 if (size_t(std::distance(first, last)) < n) {
757 ctx->p.setException(std::runtime_error("Not enough futures"));
759 // for each completed Future, increase count and add to vector, until we
760 // have n completed futures at which point we fulfil our Promise with the
762 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
763 auto c = ++ctx->completed;
765 assert(ctx->v.size() < n);
766 ctx->v.emplace_back(i, std::move(t));
768 ctx->p.setTry(Try<V>(std::move(ctx->v)));
774 return ctx->p.getFuture();
779 template <class It, class T, class F>
780 Future<T> reduce(It first, It last, T&& initial, F&& func) {
782 return makeFuture(std::move(initial));
785 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
787 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
790 typedef isTry<Arg> IsTry;
792 auto sfunc = std::make_shared<F>(std::move(func));
794 auto f = first->then(
795 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
797 std::move(minitial), head.template get<IsTry::value, Arg&&>());
800 for (++first; first != last; ++first) {
801 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
802 return (*sfunc)(std::move(std::get<0>(t).value()),
803 // Either return a ItT&& or a Try<ItT>&& depending
804 // on the type of the argument of func.
805 std::get<1>(t).template get<IsTry::value, Arg&&>());
812 // window (collection)
814 template <class Collection, class F, class ItT, class Result>
815 std::vector<Future<Result>>
816 window(Collection input, F func, size_t n) {
817 struct WindowContext {
818 WindowContext(Collection&& i, F&& fn)
819 : input_(std::move(i)), promises_(input_.size()),
822 std::atomic<size_t> i_ {0};
824 std::vector<Promise<Result>> promises_;
827 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
828 size_t i = ctx->i_++;
829 if (i < ctx->input_.size()) {
830 // Using setCallback_ directly since we don't need the Future
831 ctx->func_(std::move(ctx->input_[i])).setCallback_(
832 // ctx is captured by value
833 [ctx, i](Try<Result>&& t) {
834 ctx->promises_[i].setTry(std::move(t));
835 // Chain another future onto this one
836 spawn(std::move(ctx));
842 auto max = std::min(n, input.size());
844 auto ctx = std::make_shared<WindowContext>(
845 std::move(input), std::move(func));
847 for (size_t i = 0; i < max; ++i) {
848 // Start the first n Futures
849 WindowContext::spawn(ctx);
852 std::vector<Future<Result>> futures;
853 futures.reserve(ctx->promises_.size());
854 for (auto& promise : ctx->promises_) {
855 futures.emplace_back(promise.getFuture());
864 template <class I, class F>
865 Future<I> Future<T>::reduce(I&& initial, F&& func) {
867 minitial = std::forward<I>(initial),
868 mfunc = std::forward<F>(func)
870 auto ret = std::move(minitial);
871 for (auto& val : vals) {
872 ret = mfunc(std::move(ret), std::move(val));
878 // unorderedReduce (iterator)
880 template <class It, class T, class F, class ItT, class Arg>
881 Future<T> unorderedReduce(It first, It last, T initial, F func) {
883 return makeFuture(std::move(initial));
886 typedef isTry<Arg> IsTry;
888 struct UnorderedReduceContext {
889 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
890 : lock_(), memo_(makeFuture<T>(std::move(memo))),
891 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
893 folly::MicroSpinLock lock_; // protects memo_ and numThens_
896 size_t numThens_; // how many Futures completed and called .then()
897 size_t numFutures_; // how many Futures in total
901 auto ctx = std::make_shared<UnorderedReduceContext>(
902 std::move(initial), std::move(func), std::distance(first, last));
907 [ctx](size_t /* i */, Try<ItT>&& t) {
908 // Futures can be completed in any order, simultaneously.
909 // To make this non-blocking, we create a new Future chain in
910 // the order of completion to reduce the values.
911 // The spinlock just protects chaining a new Future, not actually
912 // executing the reduce, which should be really fast.
913 folly::MSLGuard lock(ctx->lock_);
915 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
916 // Either return a ItT&& or a Try<ItT>&& depending
917 // on the type of the argument of func.
918 return ctx->func_(std::move(v),
919 mt.template get<IsTry::value, Arg&&>());
921 if (++ctx->numThens_ == ctx->numFutures_) {
922 // After reducing the value of the last Future, fulfill the Promise
923 ctx->memo_.setCallback_(
924 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
928 return ctx->promise_.getFuture();
934 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
935 return within(dur, TimedOut(), tk);
940 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
943 Context(E ex) : exception(std::move(ex)), promise() {}
945 Future<Unit> thisFuture;
947 std::atomic<bool> token {false};
950 std::shared_ptr<Timekeeper> tks;
952 tks = folly::detail::getTimekeeperSingleton();
953 tk = DCHECK_NOTNULL(tks.get());
956 auto ctx = std::make_shared<Context>(std::move(e));
958 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
959 // TODO: "this" completed first, cancel "after"
960 if (ctx->token.exchange(true) == false) {
961 ctx->promise.setTry(std::move(t));
965 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
966 // "after" completed first, cancel "this"
967 ctx->thisFuture.raise(TimedOut());
968 if (ctx->token.exchange(true) == false) {
969 if (t.hasException()) {
970 ctx->promise.setException(std::move(t.exception()));
972 ctx->promise.setException(std::move(ctx->exception));
977 return ctx->promise.getFuture().via(getExecutor());
983 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
984 return collectAll(*this, futures::sleep(dur, tk))
985 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
986 Try<T>& t = std::get<0>(tup);
987 return makeFuture<T>(std::move(t));
994 void waitImpl(Future<T>& f) {
995 // short-circuit if there's nothing to do
996 if (f.isReady()) return;
998 FutureBatonType baton;
999 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1001 assert(f.isReady());
1005 void waitImpl(Future<T>& f, Duration dur) {
1006 // short-circuit if there's nothing to do
1012 auto ret = promise.getFuture();
1013 auto baton = std::make_shared<FutureBatonType>();
1014 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1015 promise.setTry(std::move(t));
1019 if (baton->timed_wait(dur)) {
1020 assert(f.isReady());
1025 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1026 // Set callback so to ensure that the via executor has something on it
1027 // so that once the preceding future triggers this callback, drive will
1028 // always have a callback to satisfy it
1031 f = f.via(e).then([](T&& t) { return std::move(t); });
1032 while (!f.isReady()) {
1035 assert(f.isReady());
1041 Future<T>& Future<T>::wait() & {
1042 detail::waitImpl(*this);
1047 Future<T>&& Future<T>::wait() && {
1048 detail::waitImpl(*this);
1049 return std::move(*this);
1053 Future<T>& Future<T>::wait(Duration dur) & {
1054 detail::waitImpl(*this, dur);
1059 Future<T>&& Future<T>::wait(Duration dur) && {
1060 detail::waitImpl(*this, dur);
1061 return std::move(*this);
1065 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1066 detail::waitViaImpl(*this, e);
1071 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1072 detail::waitViaImpl(*this, e);
1073 return std::move(*this);
1077 T Future<T>::get() {
1078 return std::move(wait().value());
1082 T Future<T>::get(Duration dur) {
1085 return std::move(value());
1092 T Future<T>::getVia(DrivableExecutor* e) {
1093 return std::move(waitVia(e).value());
1099 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1100 return t1.value() == t2.value();
1106 Future<bool> Future<T>::willEqual(Future<T>& f) {
1107 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1108 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1109 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1118 Future<T> Future<T>::filter(F&& predicate) {
1119 return this->then([p = std::forward<F>(predicate)](T val) {
1120 T const& valConstRef = val;
1121 if (!p(valConstRef)) {
1122 throw PredicateDoesNotObtain();
1129 template <class Callback>
1130 auto Future<T>::thenMulti(Callback&& fn)
1131 -> decltype(this->then(std::forward<Callback>(fn))) {
1132 // thenMulti with one callback is just a then
1133 return then(std::forward<Callback>(fn));
1137 template <class Callback, class... Callbacks>
1138 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1139 -> decltype(this->then(std::forward<Callback>(fn)).
1140 thenMulti(std::forward<Callbacks>(fns)...)) {
1141 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1142 return then(std::forward<Callback>(fn)).
1143 thenMulti(std::forward<Callbacks>(fns)...);
1147 template <class Callback, class... Callbacks>
1148 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1150 -> decltype(this->then(std::forward<Callback>(fn)).
1151 thenMulti(std::forward<Callbacks>(fns)...)) {
1152 // thenMultiExecutor with two callbacks is
1153 // via(x).then(a).thenMulti(b, ...).via(oldX)
1154 auto oldX = getExecutor();
1156 return then(std::forward<Callback>(fn)).
1157 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1161 template <class Callback>
1162 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1163 -> decltype(this->then(std::forward<Callback>(fn))) {
1164 // thenMulti with one callback is just a then with an executor
1165 return then(x, std::forward<Callback>(fn));
1169 inline Future<Unit> when(bool p, F&& thunk) {
1170 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1173 template <class P, class F>
1174 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1176 auto future = thunk();
1177 return future.then([
1178 predicate = std::forward<P>(predicate),
1179 thunk = std::forward<F>(thunk)
1181 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1184 return makeFuture();
1188 Future<Unit> times(const int n, F&& thunk) {
1189 return folly::whileDo(
1190 [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1191 return count->fetch_add(1) < n;
1193 std::forward<F>(thunk));
1197 template <class It, class F, class ItT, class Result>
1198 std::vector<Future<Result>> map(It first, It last, F func) {
1199 std::vector<Future<Result>> results;
1200 for (auto it = first; it != last; it++) {
1201 results.push_back(it->then(func));
1211 struct retrying_policy_raw_tag {};
1212 struct retrying_policy_fut_tag {};
1214 template <class Policy>
1215 struct retrying_policy_traits {
1216 using ew = exception_wrapper;
1217 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1218 template <class Ret>
1219 using has_op = typename std::integral_constant<bool,
1220 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1221 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1222 using is_raw = has_op<bool>;
1223 using is_fut = has_op<Future<bool>>;
1224 using tag = typename std::conditional<
1225 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1226 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1229 template <class Policy, class FF>
1230 typename std::result_of<FF(size_t)>::type
1231 retrying(size_t k, Policy&& p, FF&& ff) {
1232 using F = typename std::result_of<FF(size_t)>::type;
1233 using T = typename F::value_type;
1236 [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1237 exception_wrapper x) mutable {
1240 [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1242 return r ? retrying(k, std::move(pm), std::move(ffm))
1243 : makeFuture<T>(std::move(xm));
1248 template <class Policy, class FF>
1249 typename std::result_of<FF(size_t)>::type
1250 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1251 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1252 return makeFuture<bool>(pm(k, x));
1254 return retrying(0, std::move(q), std::forward<FF>(ff));
1257 template <class Policy, class FF>
1258 typename std::result_of<FF(size_t)>::type
1259 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1260 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1263 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1264 template <class URNG>
1265 Duration retryingJitteredExponentialBackoffDur(
1267 Duration backoff_min,
1268 Duration backoff_max,
1269 double jitter_param,
1272 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1273 auto jitter = std::exp(dist(rng));
1274 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1275 return std::max(backoff_min, std::min(backoff_max, backoff));
1278 template <class Policy, class URNG>
1279 std::function<Future<bool>(size_t, const exception_wrapper&)>
1280 retryingPolicyCappedJitteredExponentialBackoff(
1282 Duration backoff_min,
1283 Duration backoff_max,
1284 double jitter_param,
1288 pm = std::forward<Policy>(p),
1293 rngp = std::forward<URNG>(rng)
1294 ](size_t n, const exception_wrapper& ex) mutable {
1295 if (n == max_tries) {
1296 return makeFuture(false);
1298 return pm(n, ex).then(
1299 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1302 return makeFuture(false);
1304 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1305 n, backoff_min, backoff_max, jitter_param, rngp);
1306 return futures::sleep(backoff).then([] { return true; });
1311 template <class Policy, class URNG>
1312 std::function<Future<bool>(size_t, const exception_wrapper&)>
1313 retryingPolicyCappedJitteredExponentialBackoff(
1315 Duration backoff_min,
1316 Duration backoff_max,
1317 double jitter_param,
1320 retrying_policy_raw_tag) {
1321 auto q = [pm = std::forward<Policy>(p)](
1322 size_t n, const exception_wrapper& e) {
1323 return makeFuture(pm(n, e));
1325 return retryingPolicyCappedJitteredExponentialBackoff(
1330 std::forward<URNG>(rng),
1334 template <class Policy, class URNG>
1335 std::function<Future<bool>(size_t, const exception_wrapper&)>
1336 retryingPolicyCappedJitteredExponentialBackoff(
1338 Duration backoff_min,
1339 Duration backoff_max,
1340 double jitter_param,
1343 retrying_policy_fut_tag) {
1344 return retryingPolicyCappedJitteredExponentialBackoff(
1349 std::forward<URNG>(rng),
1350 std::forward<Policy>(p));
1354 template <class Policy, class FF>
1355 typename std::result_of<FF(size_t)>::type
1356 retrying(Policy&& p, FF&& ff) {
1357 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1358 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1362 std::function<bool(size_t, const exception_wrapper&)>
1363 retryingPolicyBasic(
1365 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1368 template <class Policy, class URNG>
1369 std::function<Future<bool>(size_t, const exception_wrapper&)>
1370 retryingPolicyCappedJitteredExponentialBackoff(
1372 Duration backoff_min,
1373 Duration backoff_max,
1374 double jitter_param,
1377 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1378 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1383 std::forward<URNG>(rng),
1384 std::forward<Policy>(p),
1389 std::function<Future<bool>(size_t, const exception_wrapper&)>
1390 retryingPolicyCappedJitteredExponentialBackoff(
1392 Duration backoff_min,
1393 Duration backoff_max,
1394 double jitter_param) {
1395 auto p = [](size_t, const exception_wrapper&) { return true; };
1396 return retryingPolicyCappedJitteredExponentialBackoff(
1407 // Instantiate the most common Future types to save compile time
1408 extern template class Future<Unit>;
1409 extern template class Future<bool>;
1410 extern template class Future<int>;
1411 extern template class Future<int64_t>;
1412 extern template class Future<std::string>;
1413 extern template class Future<double>;
1415 } // namespace folly