2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
71 template <typename T2>
72 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
73 : core_(new detail::Core<T>(Try<T>(T()))) {}
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::forward<F>(func));
105 typename std::enable_if<isFuture<F>::value,
106 Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108 return then([](Future<typename isFuture<T>::Inner> internal_future) {
109 return internal_future;
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
121 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122 typedef typename R::ReturnsFuture::Inner B;
127 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
129 // grab the Future now before we lose our handle on the Promise
130 auto f = p.getFuture();
131 f.core_->setExecutorNoLock(getExecutor());
133 /* This is a bit tricky.
135 We can't just close over *this in case this Future gets moved. So we
136 make a new dummy Future. We could figure out something more
137 sophisticated that avoids making a new Future object when it can, as an
138 optimization. But this is correct.
140 core_ can't be moved, it is explicitly disallowed (as is copying). But
141 if there's ever a reason to allow it, this is one place that makes that
142 assumption and would need to be fixed. We use a standard shared pointer
143 for core_ (by copying it in), which means in essence obj holds a shared
144 pointer to itself. But this shouldn't leak because Promise will not
145 outlive the continuation, because Promise will setException() with a
146 broken Promise if it is destructed before completed. We could use a
147 weak pointer but it would have to be converted to a shared pointer when
148 func is executed (because the Future returned by func may possibly
149 persist beyond the callback, if it gets moved), and so it is an
150 optimization to just make it shared from the get-go.
152 Two subtle but important points about this design. detail::Core has no
153 back pointers to Future or Promise, so if Future or Promise get moved
154 (and they will be moved in performant code) we don't have to do
155 anything fancy. And because we store the continuation in the
156 detail::Core, not in the Future, we can execute the continuation even
157 after the Future has gone out of scope. This is an intentional design
158 decision. It is likely we will want to be able to cancel a continuation
159 in some circumstances, but I think it should be explicit not implicit
160 in the destruction of the Future used to create it.
163 [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
164 if (!isTry && t.hasException()) {
165 pm.setException(std::move(t.exception()));
168 return std::move(func)(t.template get<isTry, Args>()...);
176 // Variant: returns a Future
177 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
179 template <typename F, typename R, bool isTry, typename... Args>
180 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
181 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
182 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
183 typedef typename R::ReturnsFuture::Inner B;
188 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
190 // grab the Future now before we lose our handle on the Promise
191 auto f = p.getFuture();
192 f.core_->setExecutorNoLock(getExecutor());
194 setCallback_([ func = std::forward<F>(func), pm = std::move(p) ](
195 Try<T> && t) mutable {
197 if (!isTry && t.hasException()) {
198 return std::move(t.exception());
201 auto f2 = std::move(func)(t.template get<isTry, Args>()...);
202 // that didn't throw, now we can steal p
203 f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
204 p.setTry(std::move(b));
206 return exception_wrapper();
207 } catch (const std::exception& e) {
208 return exception_wrapper(std::current_exception(), e);
210 return exception_wrapper(std::current_exception());
215 pm.setException(std::move(ew));
222 template <typename T>
223 template <typename R, typename Caller, typename... Args>
224 Future<typename isFuture<R>::Inner>
225 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
226 typedef typename std::remove_cv<
227 typename std::remove_reference<
228 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
229 return then([instance, func](Try<T>&& t){
230 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
235 template <class Executor, class Arg, class... Args>
236 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
237 -> decltype(this->then(std::forward<Arg>(arg),
238 std::forward<Args>(args)...))
240 auto oldX = getExecutor();
242 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
247 Future<Unit> Future<T>::then() {
248 return then([] () {});
251 // onError where the callback returns T
254 typename std::enable_if<
255 !detail::callableWith<F, exception_wrapper>::value &&
256 !detail::Extract<F>::ReturnsFuture::value,
258 Future<T>::onError(F&& func) {
259 typedef typename detail::Extract<F>::FirstArg Exn;
261 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
262 "Return type of onError callback must be T or Future<T>");
265 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
266 auto f = p.getFuture();
269 [ func = std::forward<F>(func), pm = std::move(p) ](Try<T> && t) mutable {
270 if (!t.template withException<Exn>([&](Exn& e) {
271 pm.setWith([&] { return std::move(func)(e); });
273 pm.setTry(std::move(t));
280 // onError where the callback returns Future<T>
283 typename std::enable_if<
284 !detail::callableWith<F, exception_wrapper>::value &&
285 detail::Extract<F>::ReturnsFuture::value,
287 Future<T>::onError(F&& func) {
289 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
290 "Return type of onError callback must be T or Future<T>");
291 typedef typename detail::Extract<F>::FirstArg Exn;
294 auto f = p.getFuture();
296 setCallback_([ pm = std::move(p), func = std::forward<F>(func) ](
297 Try<T> && t) mutable {
298 if (!t.template withException<Exn>([&](Exn& e) {
301 auto f2 = std::move(func)(e);
302 f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
303 pm.setTry(std::move(t2));
305 return exception_wrapper();
306 } catch (const std::exception& e2) {
307 return exception_wrapper(std::current_exception(), e2);
309 return exception_wrapper(std::current_exception());
313 pm.setException(std::move(ew));
316 pm.setTry(std::move(t));
325 Future<T> Future<T>::ensure(F&& func) {
326 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
328 return makeFuture(std::move(t));
334 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
335 return within(dur, tk).onError([funcw = std::forward<F>(func)](
336 TimedOut const&) { return std::move(funcw)(); });
341 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
342 detail::Extract<F>::ReturnsFuture::value,
344 Future<T>::onError(F&& func) {
346 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
347 "Return type of onError callback must be T or Future<T>");
350 auto f = p.getFuture();
352 [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
353 if (t.hasException()) {
356 auto f2 = std::move(func)(std::move(t.exception()));
357 f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
358 pm.setTry(std::move(t2));
360 return exception_wrapper();
361 } catch (const std::exception& e2) {
362 return exception_wrapper(std::current_exception(), e2);
364 return exception_wrapper(std::current_exception());
368 pm.setException(std::move(ew));
371 pm.setTry(std::move(t));
378 // onError(exception_wrapper) that returns T
381 typename std::enable_if<
382 detail::callableWith<F, exception_wrapper>::value &&
383 !detail::Extract<F>::ReturnsFuture::value,
385 Future<T>::onError(F&& func) {
387 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
388 "Return type of onError callback must be T or Future<T>");
391 auto f = p.getFuture();
393 [ pm = std::move(p), func = std::forward<F>(func) ](Try<T> t) mutable {
394 if (t.hasException()) {
395 pm.setWith([&] { return std::move(func)(std::move(t.exception())); });
397 pm.setTry(std::move(t));
405 typename std::add_lvalue_reference<T>::type Future<T>::value() {
408 return core_->getTry().value();
412 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
415 return core_->getTry().value();
419 Try<T>& Future<T>::getTry() {
422 return core_->getTry();
426 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
427 return waitVia(e).getTry();
431 Optional<Try<T>> Future<T>::poll() {
433 if (core_->ready()) {
434 o = std::move(core_->getTry());
440 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
443 setExecutor(executor, priority);
445 return std::move(*this);
449 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
453 auto f = p.getFuture();
454 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
455 return std::move(f).via(executor, priority);
458 template <class Func>
459 auto via(Executor* x, Func&& func)
460 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
461 // TODO make this actually more performant. :-P #7260175
462 return via(x).then(std::forward<Func>(func));
466 bool Future<T>::isReady() const {
468 return core_->ready();
472 bool Future<T>::hasValue() {
473 return getTry().hasValue();
477 bool Future<T>::hasException() {
478 return getTry().hasException();
482 void Future<T>::raise(exception_wrapper exception) {
483 core_->raise(std::move(exception));
489 Future<typename std::decay<T>::type> makeFuture(T&& t) {
490 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
493 inline // for multiple translation units
494 Future<Unit> makeFuture() {
495 return makeFuture(Unit{});
498 // makeFutureWith(Future<T>()) -> Future<T>
500 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
501 typename std::result_of<F()>::type>::type
502 makeFutureWith(F&& func) {
504 typename isFuture<typename std::result_of<F()>::type>::Inner;
506 return std::forward<F>(func)();
507 } catch (std::exception& e) {
508 return makeFuture<InnerType>(
509 exception_wrapper(std::current_exception(), e));
511 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
515 // makeFutureWith(T()) -> Future<T>
516 // makeFutureWith(void()) -> Future<Unit>
518 typename std::enable_if<
519 !(isFuture<typename std::result_of<F()>::type>::value),
520 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
521 makeFutureWith(F&& func) {
523 typename Unit::Lift<typename std::result_of<F()>::type>::type;
524 return makeFuture<LiftedResult>(
525 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
529 Future<T> makeFuture(std::exception_ptr const& e) {
530 return makeFuture(Try<T>(e));
534 Future<T> makeFuture(exception_wrapper ew) {
535 return makeFuture(Try<T>(std::move(ew)));
538 template <class T, class E>
539 typename std::enable_if<std::is_base_of<std::exception, E>::value,
541 makeFuture(E const& e) {
542 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
546 Future<T> makeFuture(Try<T>&& t) {
547 return Future<T>(new detail::Core<T>(std::move(t)));
551 Future<Unit> via(Executor* executor, int8_t priority) {
552 return makeFuture().via(executor, priority);
555 // mapSetCallback calls func(i, Try<T>) when every future completes
557 template <class T, class InputIterator, class F>
558 void mapSetCallback(InputIterator first, InputIterator last, F func) {
559 for (size_t i = 0; first != last; ++first, ++i) {
560 first->setCallback_([func, i](Try<T>&& t) {
561 func(i, std::move(t));
566 // collectAll (variadic)
568 template <typename... Fs>
569 typename detail::CollectAllVariadicContext<
570 typename std::decay<Fs>::type::value_type...>::type
571 collectAll(Fs&&... fs) {
572 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
573 typename std::decay<Fs>::type::value_type...>>();
574 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
575 ctx, std::forward<Fs>(fs)...);
576 return ctx->p.getFuture();
579 // collectAll (iterator)
581 template <class InputIterator>
584 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
585 collectAll(InputIterator first, InputIterator last) {
587 typename std::iterator_traits<InputIterator>::value_type::value_type T;
589 struct CollectAllContext {
590 CollectAllContext(size_t n) : results(n) {}
591 ~CollectAllContext() {
592 p.setValue(std::move(results));
594 Promise<std::vector<Try<T>>> p;
595 std::vector<Try<T>> results;
599 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
600 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
601 ctx->results[i] = std::move(t);
603 return ctx->p.getFuture();
606 // collect (iterator)
610 template <typename T>
611 struct CollectContext {
613 explicit Nothing(int /* n */) {}
616 using Result = typename std::conditional<
617 std::is_void<T>::value,
619 std::vector<T>>::type;
621 using InternalResult = typename std::conditional<
622 std::is_void<T>::value,
624 std::vector<Optional<T>>>::type;
626 explicit CollectContext(size_t n) : result(n) {}
628 if (!threw.exchange(true)) {
629 // map Optional<T> -> T
630 std::vector<T> finalResult;
631 finalResult.reserve(result.size());
632 std::transform(result.begin(), result.end(),
633 std::back_inserter(finalResult),
634 [](Optional<T>& o) { return std::move(o.value()); });
635 p.setValue(std::move(finalResult));
638 inline void setPartialResult(size_t i, Try<T>& t) {
639 result[i] = std::move(t.value());
642 InternalResult result;
643 std::atomic<bool> threw {false};
648 template <class InputIterator>
649 Future<typename detail::CollectContext<
650 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
651 collect(InputIterator first, InputIterator last) {
653 typename std::iterator_traits<InputIterator>::value_type::value_type T;
655 auto ctx = std::make_shared<detail::CollectContext<T>>(
656 std::distance(first, last));
657 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
658 if (t.hasException()) {
659 if (!ctx->threw.exchange(true)) {
660 ctx->p.setException(std::move(t.exception()));
662 } else if (!ctx->threw) {
663 ctx->setPartialResult(i, t);
666 return ctx->p.getFuture();
669 // collect (variadic)
671 template <typename... Fs>
672 typename detail::CollectVariadicContext<
673 typename std::decay<Fs>::type::value_type...>::type
674 collect(Fs&&... fs) {
675 auto ctx = std::make_shared<detail::CollectVariadicContext<
676 typename std::decay<Fs>::type::value_type...>>();
677 detail::collectVariadicHelper<detail::CollectVariadicContext>(
678 ctx, std::forward<Fs>(fs)...);
679 return ctx->p.getFuture();
682 // collectAny (iterator)
684 template <class InputIterator>
689 std::iterator_traits<InputIterator>::value_type::value_type>>>
690 collectAny(InputIterator first, InputIterator last) {
692 typename std::iterator_traits<InputIterator>::value_type::value_type T;
694 struct CollectAnyContext {
695 CollectAnyContext() {}
696 Promise<std::pair<size_t, Try<T>>> p;
697 std::atomic<bool> done {false};
700 auto ctx = std::make_shared<CollectAnyContext>();
701 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
702 if (!ctx->done.exchange(true)) {
703 ctx->p.setValue(std::make_pair(i, std::move(t)));
706 return ctx->p.getFuture();
709 // collectAnyWithoutException (iterator)
711 template <class InputIterator>
714 typename std::iterator_traits<InputIterator>::value_type::value_type>>
715 collectAnyWithoutException(InputIterator first, InputIterator last) {
717 typename std::iterator_traits<InputIterator>::value_type::value_type T;
719 struct CollectAnyWithoutExceptionContext {
720 CollectAnyWithoutExceptionContext(){}
721 Promise<std::pair<size_t, T>> p;
722 std::atomic<bool> done{false};
723 std::atomic<size_t> nFulfilled{0};
727 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
728 ctx->nTotal = size_t(std::distance(first, last));
730 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
731 if (!t.hasException() && !ctx->done.exchange(true)) {
732 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
733 } else if (++ctx->nFulfilled == ctx->nTotal) {
734 ctx->p.setException(t.exception());
737 return ctx->p.getFuture();
740 // collectN (iterator)
742 template <class InputIterator>
743 Future<std::vector<std::pair<size_t, Try<typename
744 std::iterator_traits<InputIterator>::value_type::value_type>>>>
745 collectN(InputIterator first, InputIterator last, size_t n) {
747 std::iterator_traits<InputIterator>::value_type::value_type T;
748 typedef std::vector<std::pair<size_t, Try<T>>> V;
750 struct CollectNContext {
752 std::atomic<size_t> completed = {0};
755 auto ctx = std::make_shared<CollectNContext>();
757 if (size_t(std::distance(first, last)) < n) {
758 ctx->p.setException(std::runtime_error("Not enough futures"));
760 // for each completed Future, increase count and add to vector, until we
761 // have n completed futures at which point we fulfil our Promise with the
763 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
764 auto c = ++ctx->completed;
766 assert(ctx->v.size() < n);
767 ctx->v.emplace_back(i, std::move(t));
769 ctx->p.setTry(Try<V>(std::move(ctx->v)));
775 return ctx->p.getFuture();
780 template <class It, class T, class F>
781 Future<T> reduce(It first, It last, T&& initial, F&& func) {
783 return makeFuture(std::move(initial));
786 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
788 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
791 typedef isTry<Arg> IsTry;
793 auto sfunc = std::make_shared<F>(std::move(func));
795 auto f = first->then(
796 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
798 std::move(minitial), head.template get<IsTry::value, Arg&&>());
801 for (++first; first != last; ++first) {
802 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
803 return (*sfunc)(std::move(std::get<0>(t).value()),
804 // Either return a ItT&& or a Try<ItT>&& depending
805 // on the type of the argument of func.
806 std::get<1>(t).template get<IsTry::value, Arg&&>());
813 // window (collection)
815 template <class Collection, class F, class ItT, class Result>
816 std::vector<Future<Result>>
817 window(Collection input, F func, size_t n) {
818 struct WindowContext {
819 WindowContext(Collection&& i, F&& fn)
820 : input_(std::move(i)), promises_(input_.size()),
823 std::atomic<size_t> i_ {0};
825 std::vector<Promise<Result>> promises_;
828 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
829 size_t i = ctx->i_++;
830 if (i < ctx->input_.size()) {
831 // Using setCallback_ directly since we don't need the Future
832 ctx->func_(std::move(ctx->input_[i])).setCallback_(
833 // ctx is captured by value
834 [ctx, i](Try<Result>&& t) {
835 ctx->promises_[i].setTry(std::move(t));
836 // Chain another future onto this one
837 spawn(std::move(ctx));
843 auto max = std::min(n, input.size());
845 auto ctx = std::make_shared<WindowContext>(
846 std::move(input), std::move(func));
848 for (size_t i = 0; i < max; ++i) {
849 // Start the first n Futures
850 WindowContext::spawn(ctx);
853 std::vector<Future<Result>> futures;
854 futures.reserve(ctx->promises_.size());
855 for (auto& promise : ctx->promises_) {
856 futures.emplace_back(promise.getFuture());
865 template <class I, class F>
866 Future<I> Future<T>::reduce(I&& initial, F&& func) {
868 minitial = std::forward<I>(initial),
869 mfunc = std::forward<F>(func)
871 auto ret = std::move(minitial);
872 for (auto& val : vals) {
873 ret = mfunc(std::move(ret), std::move(val));
879 // unorderedReduce (iterator)
881 template <class It, class T, class F, class ItT, class Arg>
882 Future<T> unorderedReduce(It first, It last, T initial, F func) {
884 return makeFuture(std::move(initial));
887 typedef isTry<Arg> IsTry;
889 struct UnorderedReduceContext {
890 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
891 : lock_(), memo_(makeFuture<T>(std::move(memo))),
892 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
894 folly::MicroSpinLock lock_; // protects memo_ and numThens_
897 size_t numThens_; // how many Futures completed and called .then()
898 size_t numFutures_; // how many Futures in total
902 auto ctx = std::make_shared<UnorderedReduceContext>(
903 std::move(initial), std::move(func), std::distance(first, last));
908 [ctx](size_t /* i */, Try<ItT>&& t) {
909 // Futures can be completed in any order, simultaneously.
910 // To make this non-blocking, we create a new Future chain in
911 // the order of completion to reduce the values.
912 // The spinlock just protects chaining a new Future, not actually
913 // executing the reduce, which should be really fast.
914 folly::MSLGuard lock(ctx->lock_);
916 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
917 // Either return a ItT&& or a Try<ItT>&& depending
918 // on the type of the argument of func.
919 return ctx->func_(std::move(v),
920 mt.template get<IsTry::value, Arg&&>());
922 if (++ctx->numThens_ == ctx->numFutures_) {
923 // After reducing the value of the last Future, fulfill the Promise
924 ctx->memo_.setCallback_(
925 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
929 return ctx->promise_.getFuture();
935 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
936 return within(dur, TimedOut(), tk);
941 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
944 Context(E ex) : exception(std::move(ex)), promise() {}
946 Future<Unit> thisFuture;
948 std::atomic<bool> token {false};
951 std::shared_ptr<Timekeeper> tks;
953 tks = folly::detail::getTimekeeperSingleton();
954 tk = DCHECK_NOTNULL(tks.get());
957 auto ctx = std::make_shared<Context>(std::move(e));
959 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
960 // TODO: "this" completed first, cancel "after"
961 if (ctx->token.exchange(true) == false) {
962 ctx->promise.setTry(std::move(t));
966 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
967 // "after" completed first, cancel "this"
968 ctx->thisFuture.raise(TimedOut());
969 if (ctx->token.exchange(true) == false) {
970 if (t.hasException()) {
971 ctx->promise.setException(std::move(t.exception()));
973 ctx->promise.setException(std::move(ctx->exception));
978 return ctx->promise.getFuture().via(getExecutor());
984 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
985 return collectAll(*this, futures::sleep(dur, tk))
986 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
987 Try<T>& t = std::get<0>(tup);
988 return makeFuture<T>(std::move(t));
995 void waitImpl(Future<T>& f) {
996 // short-circuit if there's nothing to do
997 if (f.isReady()) return;
999 FutureBatonType baton;
1000 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1002 assert(f.isReady());
1006 void waitImpl(Future<T>& f, Duration dur) {
1007 // short-circuit if there's nothing to do
1013 auto ret = promise.getFuture();
1014 auto baton = std::make_shared<FutureBatonType>();
1015 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1016 promise.setTry(std::move(t));
1020 if (baton->timed_wait(dur)) {
1021 assert(f.isReady());
1026 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1027 // Set callback so to ensure that the via executor has something on it
1028 // so that once the preceding future triggers this callback, drive will
1029 // always have a callback to satisfy it
1032 f = f.via(e).then([](T&& t) { return std::move(t); });
1033 while (!f.isReady()) {
1036 assert(f.isReady());
1042 Future<T>& Future<T>::wait() & {
1043 detail::waitImpl(*this);
1048 Future<T>&& Future<T>::wait() && {
1049 detail::waitImpl(*this);
1050 return std::move(*this);
1054 Future<T>& Future<T>::wait(Duration dur) & {
1055 detail::waitImpl(*this, dur);
1060 Future<T>&& Future<T>::wait(Duration dur) && {
1061 detail::waitImpl(*this, dur);
1062 return std::move(*this);
1066 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1067 detail::waitViaImpl(*this, e);
1072 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1073 detail::waitViaImpl(*this, e);
1074 return std::move(*this);
1078 T Future<T>::get() {
1079 return std::move(wait().value());
1083 T Future<T>::get(Duration dur) {
1086 return std::move(value());
1093 T Future<T>::getVia(DrivableExecutor* e) {
1094 return std::move(waitVia(e).value());
1100 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1101 return t1.value() == t2.value();
1107 Future<bool> Future<T>::willEqual(Future<T>& f) {
1108 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1109 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1110 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1119 Future<T> Future<T>::filter(F&& predicate) {
1120 return this->then([p = std::forward<F>(predicate)](T val) {
1121 T const& valConstRef = val;
1122 if (!p(valConstRef)) {
1123 throw PredicateDoesNotObtain();
1130 template <class Callback>
1131 auto Future<T>::thenMulti(Callback&& fn)
1132 -> decltype(this->then(std::forward<Callback>(fn))) {
1133 // thenMulti with one callback is just a then
1134 return then(std::forward<Callback>(fn));
1138 template <class Callback, class... Callbacks>
1139 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1140 -> decltype(this->then(std::forward<Callback>(fn)).
1141 thenMulti(std::forward<Callbacks>(fns)...)) {
1142 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1143 return then(std::forward<Callback>(fn)).
1144 thenMulti(std::forward<Callbacks>(fns)...);
1148 template <class Callback, class... Callbacks>
1149 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1151 -> decltype(this->then(std::forward<Callback>(fn)).
1152 thenMulti(std::forward<Callbacks>(fns)...)) {
1153 // thenMultiExecutor with two callbacks is
1154 // via(x).then(a).thenMulti(b, ...).via(oldX)
1155 auto oldX = getExecutor();
1157 return then(std::forward<Callback>(fn)).
1158 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1162 template <class Callback>
1163 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1164 -> decltype(this->then(std::forward<Callback>(fn))) {
1165 // thenMulti with one callback is just a then with an executor
1166 return then(x, std::forward<Callback>(fn));
1170 inline Future<Unit> when(bool p, F&& thunk) {
1171 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1174 template <class P, class F>
1175 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1177 auto future = thunk();
1178 return future.then([
1179 predicate = std::forward<P>(predicate),
1180 thunk = std::forward<F>(thunk)
1182 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1185 return makeFuture();
1189 Future<Unit> times(const int n, F&& thunk) {
1190 return folly::whileDo(
1191 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1192 return count->fetch_add(1) < n;
1194 std::forward<F>(thunk));
1198 template <class It, class F, class ItT, class Result>
1199 std::vector<Future<Result>> map(It first, It last, F func) {
1200 std::vector<Future<Result>> results;
1201 for (auto it = first; it != last; it++) {
1202 results.push_back(it->then(func));
1212 struct retrying_policy_raw_tag {};
1213 struct retrying_policy_fut_tag {};
1215 template <class Policy>
1216 struct retrying_policy_traits {
1217 using ew = exception_wrapper;
1218 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1219 template <class Ret>
1220 using has_op = typename std::integral_constant<bool,
1221 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1222 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1223 using is_raw = has_op<bool>;
1224 using is_fut = has_op<Future<bool>>;
1225 using tag = typename std::conditional<
1226 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1227 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1230 template <class Policy, class FF, class Prom>
1231 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1232 using F = typename std::result_of<FF(size_t)>::type;
1233 using T = typename F::value_type;
1234 auto f = makeFutureWith([&] { return ff(k++); });
1237 prom = std::move(prom),
1238 pm = std::forward<Policy>(p),
1239 ffm = std::forward<FF>(ff)
1240 ](Try<T> && t) mutable {
1242 prom.setValue(std::move(t).value());
1245 auto& x = t.exception();
1249 prom = std::move(prom),
1252 ffm = std::move(ffm)
1253 ](bool shouldRetry) mutable {
1255 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1257 prom.setException(std::move(xm));
1263 template <class Policy, class FF>
1264 typename std::result_of<FF(size_t)>::type
1265 retrying(size_t k, Policy&& p, FF&& ff) {
1266 using F = typename std::result_of<FF(size_t)>::type;
1267 using T = typename F::value_type;
1268 auto prom = Promise<T>();
1269 auto f = prom.getFuture();
1271 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1275 template <class Policy, class FF>
1276 typename std::result_of<FF(size_t)>::type
1277 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1278 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1279 return makeFuture<bool>(pm(k, x));
1281 return retrying(0, std::move(q), std::forward<FF>(ff));
1284 template <class Policy, class FF>
1285 typename std::result_of<FF(size_t)>::type
1286 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1287 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1290 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1291 template <class URNG>
1292 Duration retryingJitteredExponentialBackoffDur(
1294 Duration backoff_min,
1295 Duration backoff_max,
1296 double jitter_param,
1299 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1300 auto jitter = std::exp(dist(rng));
1301 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1302 return std::max(backoff_min, std::min(backoff_max, backoff));
1305 template <class Policy, class URNG>
1306 std::function<Future<bool>(size_t, const exception_wrapper&)>
1307 retryingPolicyCappedJitteredExponentialBackoff(
1309 Duration backoff_min,
1310 Duration backoff_max,
1311 double jitter_param,
1315 pm = std::forward<Policy>(p),
1320 rngp = std::forward<URNG>(rng)
1321 ](size_t n, const exception_wrapper& ex) mutable {
1322 if (n == max_tries) {
1323 return makeFuture(false);
1325 return pm(n, ex).then(
1326 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1329 return makeFuture(false);
1331 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1332 n, backoff_min, backoff_max, jitter_param, rngp);
1333 return futures::sleep(backoff).then([] { return true; });
1338 template <class Policy, class URNG>
1339 std::function<Future<bool>(size_t, const exception_wrapper&)>
1340 retryingPolicyCappedJitteredExponentialBackoff(
1342 Duration backoff_min,
1343 Duration backoff_max,
1344 double jitter_param,
1347 retrying_policy_raw_tag) {
1348 auto q = [pm = std::forward<Policy>(p)](
1349 size_t n, const exception_wrapper& e) {
1350 return makeFuture(pm(n, e));
1352 return retryingPolicyCappedJitteredExponentialBackoff(
1357 std::forward<URNG>(rng),
1361 template <class Policy, class URNG>
1362 std::function<Future<bool>(size_t, const exception_wrapper&)>
1363 retryingPolicyCappedJitteredExponentialBackoff(
1365 Duration backoff_min,
1366 Duration backoff_max,
1367 double jitter_param,
1370 retrying_policy_fut_tag) {
1371 return retryingPolicyCappedJitteredExponentialBackoff(
1376 std::forward<URNG>(rng),
1377 std::forward<Policy>(p));
1381 template <class Policy, class FF>
1382 typename std::result_of<FF(size_t)>::type
1383 retrying(Policy&& p, FF&& ff) {
1384 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1385 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1389 std::function<bool(size_t, const exception_wrapper&)>
1390 retryingPolicyBasic(
1392 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1395 template <class Policy, class URNG>
1396 std::function<Future<bool>(size_t, const exception_wrapper&)>
1397 retryingPolicyCappedJitteredExponentialBackoff(
1399 Duration backoff_min,
1400 Duration backoff_max,
1401 double jitter_param,
1404 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1405 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1410 std::forward<URNG>(rng),
1411 std::forward<Policy>(p),
1416 std::function<Future<bool>(size_t, const exception_wrapper&)>
1417 retryingPolicyCappedJitteredExponentialBackoff(
1419 Duration backoff_min,
1420 Duration backoff_max,
1421 double jitter_param) {
1422 auto p = [](size_t, const exception_wrapper&) { return true; };
1423 return retryingPolicyCappedJitteredExponentialBackoff(
1434 // Instantiate the most common Future types to save compile time
1435 extern template class Future<Unit>;
1436 extern template class Future<bool>;
1437 extern template class Future<int>;
1438 extern template class Future<int64_t>;
1439 extern template class Future<std::string>;
1440 extern template class Future<double>;
1442 } // namespace folly