2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68 : core_(detail::Core<T>::convert(other.core_)) {
69 other.core_ = nullptr;
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75 std::swap(core_, detail::Core<T>::convert(other.core_));
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
85 template <typename T2>
86 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
87 : core_(new detail::Core<T>(Try<T>(T()))) {}
90 Future<T>::~Future() {
95 void Future<T>::detach() {
97 core_->detachFuture();
103 void Future<T>::throwIfInvalid() const {
110 void Future<T>::setCallback_(F&& func) {
112 core_->setCallback(std::forward<F>(func));
119 typename std::enable_if<isFuture<F>::value,
120 Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122 return then([](Future<typename isFuture<T>::Inner> internal_future) {
123 return internal_future;
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
135 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136 typedef typename R::ReturnsFuture::Inner B;
141 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
143 // grab the Future now before we lose our handle on the Promise
144 auto f = p.getFuture();
145 f.core_->setExecutorNoLock(getExecutor());
147 /* This is a bit tricky.
149 We can't just close over *this in case this Future gets moved. So we
150 make a new dummy Future. We could figure out something more
151 sophisticated that avoids making a new Future object when it can, as an
152 optimization. But this is correct.
154 core_ can't be moved, it is explicitly disallowed (as is copying). But
155 if there's ever a reason to allow it, this is one place that makes that
156 assumption and would need to be fixed. We use a standard shared pointer
157 for core_ (by copying it in), which means in essence obj holds a shared
158 pointer to itself. But this shouldn't leak because Promise will not
159 outlive the continuation, because Promise will setException() with a
160 broken Promise if it is destructed before completed. We could use a
161 weak pointer but it would have to be converted to a shared pointer when
162 func is executed (because the Future returned by func may possibly
163 persist beyond the callback, if it gets moved), and so it is an
164 optimization to just make it shared from the get-go.
166 Two subtle but important points about this design. detail::Core has no
167 back pointers to Future or Promise, so if Future or Promise get moved
168 (and they will be moved in performant code) we don't have to do
169 anything fancy. And because we store the continuation in the
170 detail::Core, not in the Future, we can execute the continuation even
171 after the Future has gone out of scope. This is an intentional design
172 decision. It is likely we will want to be able to cancel a continuation
173 in some circumstances, but I think it should be explicit not implicit
174 in the destruction of the Future used to create it.
176 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
177 Try<T> && t) mutable {
178 if (!isTry && t.hasException()) {
179 pm.setException(std::move(t.exception()));
181 pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
188 // Variant: returns a Future
189 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
191 template <typename F, typename R, bool isTry, typename... Args>
192 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
193 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
194 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
195 typedef typename R::ReturnsFuture::Inner B;
200 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
202 // grab the Future now before we lose our handle on the Promise
203 auto f = p.getFuture();
204 f.core_->setExecutorNoLock(getExecutor());
206 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
207 Try<T> && t) mutable {
209 if (!isTry && t.hasException()) {
210 return std::move(t.exception());
213 auto f2 = funcm(t.template get<isTry, Args>()...);
214 // that didn't throw, now we can steal p
215 f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
216 p.setTry(std::move(b));
218 return exception_wrapper();
219 } catch (const std::exception& e) {
220 return exception_wrapper(std::current_exception(), e);
222 return exception_wrapper(std::current_exception());
227 pm.setException(std::move(ew));
234 template <typename T>
235 template <typename R, typename Caller, typename... Args>
236 Future<typename isFuture<R>::Inner>
237 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
238 typedef typename std::remove_cv<
239 typename std::remove_reference<
240 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
241 return then([instance, func](Try<T>&& t){
242 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
247 template <class Executor, class Arg, class... Args>
248 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
249 -> decltype(this->then(std::forward<Arg>(arg),
250 std::forward<Args>(args)...))
252 auto oldX = getExecutor();
254 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
259 Future<Unit> Future<T>::then() {
260 return then([] () {});
263 // onError where the callback returns T
266 typename std::enable_if<
267 !detail::callableWith<F, exception_wrapper>::value &&
268 !detail::Extract<F>::ReturnsFuture::value,
270 Future<T>::onError(F&& func) {
271 typedef typename detail::Extract<F>::FirstArg Exn;
273 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
274 "Return type of onError callback must be T or Future<T>");
277 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
278 auto f = p.getFuture();
280 setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
281 Try<T> && t) mutable {
282 if (!t.template withException<Exn>(
283 [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
284 pm.setTry(std::move(t));
291 // onError where the callback returns Future<T>
294 typename std::enable_if<
295 !detail::callableWith<F, exception_wrapper>::value &&
296 detail::Extract<F>::ReturnsFuture::value,
298 Future<T>::onError(F&& func) {
300 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
301 "Return type of onError callback must be T or Future<T>");
302 typedef typename detail::Extract<F>::FirstArg Exn;
305 auto f = p.getFuture();
307 setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
308 Try<T> && t) mutable {
309 if (!t.template withException<Exn>([&](Exn& e) {
313 f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
314 pm.setTry(std::move(t2));
316 return exception_wrapper();
317 } catch (const std::exception& e2) {
318 return exception_wrapper(std::current_exception(), e2);
320 return exception_wrapper(std::current_exception());
324 pm.setException(std::move(ew));
327 pm.setTry(std::move(t));
336 Future<T> Future<T>::ensure(F&& func) {
337 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
339 return makeFuture(std::move(t));
345 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
346 return within(dur, tk).onError([funcw = std::forward<F>(func)](
347 TimedOut const&) { return funcw(); });
352 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
353 detail::Extract<F>::ReturnsFuture::value,
355 Future<T>::onError(F&& func) {
357 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
358 "Return type of onError callback must be T or Future<T>");
361 auto f = p.getFuture();
363 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
364 if (t.hasException()) {
367 auto f2 = funcm(std::move(t.exception()));
368 f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
369 pm.setTry(std::move(t2));
371 return exception_wrapper();
372 } catch (const std::exception& e2) {
373 return exception_wrapper(std::current_exception(), e2);
375 return exception_wrapper(std::current_exception());
379 pm.setException(std::move(ew));
382 pm.setTry(std::move(t));
389 // onError(exception_wrapper) that returns T
392 typename std::enable_if<
393 detail::callableWith<F, exception_wrapper>::value &&
394 !detail::Extract<F>::ReturnsFuture::value,
396 Future<T>::onError(F&& func) {
398 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
399 "Return type of onError callback must be T or Future<T>");
402 auto f = p.getFuture();
404 [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
405 if (t.hasException()) {
406 pm.setWith([&] { return funcm(std::move(t.exception())); });
408 pm.setTry(std::move(t));
416 typename std::add_lvalue_reference<T>::type Future<T>::value() {
419 return core_->getTry().value();
423 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
426 return core_->getTry().value();
430 Try<T>& Future<T>::getTry() {
433 return core_->getTry();
437 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
438 return waitVia(e).getTry();
442 Optional<Try<T>> Future<T>::poll() {
444 if (core_->ready()) {
445 o = std::move(core_->getTry());
451 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
454 setExecutor(executor, priority);
456 return std::move(*this);
460 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
464 auto f = p.getFuture();
465 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
466 return std::move(f).via(executor, priority);
469 template <class Func>
470 auto via(Executor* x, Func&& func)
471 -> Future<typename isFuture<decltype(func())>::Inner>
473 // TODO make this actually more performant. :-P #7260175
474 return via(x).then(std::forward<Func>(func));
478 bool Future<T>::isReady() const {
480 return core_->ready();
484 bool Future<T>::hasValue() {
485 return getTry().hasValue();
489 bool Future<T>::hasException() {
490 return getTry().hasException();
494 void Future<T>::raise(exception_wrapper exception) {
495 core_->raise(std::move(exception));
501 Future<typename std::decay<T>::type> makeFuture(T&& t) {
502 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
505 inline // for multiple translation units
506 Future<Unit> makeFuture() {
507 return makeFuture(Unit{});
510 // makeFutureWith(Future<T>()) -> Future<T>
512 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
513 typename std::result_of<F()>::type>::type
514 makeFutureWith(F&& func) {
516 typename isFuture<typename std::result_of<F()>::type>::Inner;
519 } catch (std::exception& e) {
520 return makeFuture<InnerType>(
521 exception_wrapper(std::current_exception(), e));
523 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
527 // makeFutureWith(T()) -> Future<T>
528 // makeFutureWith(void()) -> Future<Unit>
530 typename std::enable_if<
531 !(isFuture<typename std::result_of<F()>::type>::value),
532 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
533 makeFutureWith(F&& func) {
535 typename Unit::Lift<typename std::result_of<F()>::type>::type;
536 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
542 Future<T> makeFuture(std::exception_ptr const& e) {
543 return makeFuture(Try<T>(e));
547 Future<T> makeFuture(exception_wrapper ew) {
548 return makeFuture(Try<T>(std::move(ew)));
551 template <class T, class E>
552 typename std::enable_if<std::is_base_of<std::exception, E>::value,
554 makeFuture(E const& e) {
555 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
559 Future<T> makeFuture(Try<T>&& t) {
560 return Future<T>(new detail::Core<T>(std::move(t)));
564 Future<Unit> via(Executor* executor, int8_t priority) {
565 return makeFuture().via(executor, priority);
568 // mapSetCallback calls func(i, Try<T>) when every future completes
570 template <class T, class InputIterator, class F>
571 void mapSetCallback(InputIterator first, InputIterator last, F func) {
572 for (size_t i = 0; first != last; ++first, ++i) {
573 first->setCallback_([func, i](Try<T>&& t) {
574 func(i, std::move(t));
579 // collectAll (variadic)
581 template <typename... Fs>
582 typename detail::CollectAllVariadicContext<
583 typename std::decay<Fs>::type::value_type...>::type
584 collectAll(Fs&&... fs) {
585 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
586 typename std::decay<Fs>::type::value_type...>>();
587 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
588 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
589 return ctx->p.getFuture();
592 // collectAll (iterator)
594 template <class InputIterator>
597 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
598 collectAll(InputIterator first, InputIterator last) {
600 typename std::iterator_traits<InputIterator>::value_type::value_type T;
602 struct CollectAllContext {
603 CollectAllContext(int n) : results(n) {}
604 ~CollectAllContext() {
605 p.setValue(std::move(results));
607 Promise<std::vector<Try<T>>> p;
608 std::vector<Try<T>> results;
611 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
612 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
613 ctx->results[i] = std::move(t);
615 return ctx->p.getFuture();
618 // collect (iterator)
622 template <typename T>
623 struct CollectContext {
625 explicit Nothing(int /* n */) {}
628 using Result = typename std::conditional<
629 std::is_void<T>::value,
631 std::vector<T>>::type;
633 using InternalResult = typename std::conditional<
634 std::is_void<T>::value,
636 std::vector<Optional<T>>>::type;
638 explicit CollectContext(int n) : result(n) {}
640 if (!threw.exchange(true)) {
641 // map Optional<T> -> T
642 std::vector<T> finalResult;
643 finalResult.reserve(result.size());
644 std::transform(result.begin(), result.end(),
645 std::back_inserter(finalResult),
646 [](Optional<T>& o) { return std::move(o.value()); });
647 p.setValue(std::move(finalResult));
650 inline void setPartialResult(size_t i, Try<T>& t) {
651 result[i] = std::move(t.value());
654 InternalResult result;
655 std::atomic<bool> threw {false};
660 template <class InputIterator>
661 Future<typename detail::CollectContext<
662 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
663 collect(InputIterator first, InputIterator last) {
665 typename std::iterator_traits<InputIterator>::value_type::value_type T;
667 auto ctx = std::make_shared<detail::CollectContext<T>>(
668 std::distance(first, last));
669 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
670 if (t.hasException()) {
671 if (!ctx->threw.exchange(true)) {
672 ctx->p.setException(std::move(t.exception()));
674 } else if (!ctx->threw) {
675 ctx->setPartialResult(i, t);
678 return ctx->p.getFuture();
681 // collect (variadic)
683 template <typename... Fs>
684 typename detail::CollectVariadicContext<
685 typename std::decay<Fs>::type::value_type...>::type
686 collect(Fs&&... fs) {
687 auto ctx = std::make_shared<detail::CollectVariadicContext<
688 typename std::decay<Fs>::type::value_type...>>();
689 detail::collectVariadicHelper<detail::CollectVariadicContext>(
690 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
691 return ctx->p.getFuture();
694 // collectAny (iterator)
696 template <class InputIterator>
701 std::iterator_traits<InputIterator>::value_type::value_type>>>
702 collectAny(InputIterator first, InputIterator last) {
704 typename std::iterator_traits<InputIterator>::value_type::value_type T;
706 struct CollectAnyContext {
707 CollectAnyContext() {}
708 Promise<std::pair<size_t, Try<T>>> p;
709 std::atomic<bool> done {false};
712 auto ctx = std::make_shared<CollectAnyContext>();
713 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
714 if (!ctx->done.exchange(true)) {
715 ctx->p.setValue(std::make_pair(i, std::move(t)));
718 return ctx->p.getFuture();
721 // collectAnyWithoutException (iterator)
723 template <class InputIterator>
726 typename std::iterator_traits<InputIterator>::value_type::value_type>>
727 collectAnyWithoutException(InputIterator first, InputIterator last) {
729 typename std::iterator_traits<InputIterator>::value_type::value_type T;
731 struct CollectAnyWithoutExceptionContext {
732 CollectAnyWithoutExceptionContext(){}
733 Promise<std::pair<size_t, T>> p;
734 std::atomic<bool> done{false};
735 std::atomic<size_t> nFulfilled{0};
739 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
740 ctx->nTotal = std::distance(first, last);
742 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
743 if (!t.hasException() && !ctx->done.exchange(true)) {
744 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
745 } else if (++ctx->nFulfilled == ctx->nTotal) {
746 ctx->p.setException(t.exception());
749 return ctx->p.getFuture();
752 // collectN (iterator)
754 template <class InputIterator>
755 Future<std::vector<std::pair<size_t, Try<typename
756 std::iterator_traits<InputIterator>::value_type::value_type>>>>
757 collectN(InputIterator first, InputIterator last, size_t n) {
759 std::iterator_traits<InputIterator>::value_type::value_type T;
760 typedef std::vector<std::pair<size_t, Try<T>>> V;
762 struct CollectNContext {
764 std::atomic<size_t> completed = {0};
767 auto ctx = std::make_shared<CollectNContext>();
769 if (size_t(std::distance(first, last)) < n) {
770 ctx->p.setException(std::runtime_error("Not enough futures"));
772 // for each completed Future, increase count and add to vector, until we
773 // have n completed futures at which point we fulfil our Promise with the
775 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
776 auto c = ++ctx->completed;
778 assert(ctx->v.size() < n);
779 ctx->v.emplace_back(i, std::move(t));
781 ctx->p.setTry(Try<V>(std::move(ctx->v)));
787 return ctx->p.getFuture();
792 template <class It, class T, class F>
793 Future<T> reduce(It first, It last, T&& initial, F&& func) {
795 return makeFuture(std::move(initial));
798 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
800 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
803 typedef isTry<Arg> IsTry;
805 auto sfunc = std::make_shared<F>(std::move(func));
807 auto f = first->then(
808 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
810 std::move(minitial), head.template get<IsTry::value, Arg&&>());
813 for (++first; first != last; ++first) {
814 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
815 return (*sfunc)(std::move(std::get<0>(t).value()),
816 // Either return a ItT&& or a Try<ItT>&& depending
817 // on the type of the argument of func.
818 std::get<1>(t).template get<IsTry::value, Arg&&>());
825 // window (collection)
827 template <class Collection, class F, class ItT, class Result>
828 std::vector<Future<Result>>
829 window(Collection input, F func, size_t n) {
830 struct WindowContext {
831 WindowContext(Collection&& i, F&& fn)
832 : input_(std::move(i)), promises_(input_.size()),
835 std::atomic<size_t> i_ {0};
837 std::vector<Promise<Result>> promises_;
840 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
841 size_t i = ctx->i_++;
842 if (i < ctx->input_.size()) {
843 // Using setCallback_ directly since we don't need the Future
844 ctx->func_(std::move(ctx->input_[i])).setCallback_(
845 // ctx is captured by value
846 [ctx, i](Try<Result>&& t) {
847 ctx->promises_[i].setTry(std::move(t));
848 // Chain another future onto this one
849 spawn(std::move(ctx));
855 auto max = std::min(n, input.size());
857 auto ctx = std::make_shared<WindowContext>(
858 std::move(input), std::move(func));
860 for (size_t i = 0; i < max; ++i) {
861 // Start the first n Futures
862 WindowContext::spawn(ctx);
865 std::vector<Future<Result>> futures;
866 futures.reserve(ctx->promises_.size());
867 for (auto& promise : ctx->promises_) {
868 futures.emplace_back(promise.getFuture());
877 template <class I, class F>
878 Future<I> Future<T>::reduce(I&& initial, F&& func) {
880 minitial = std::forward<I>(initial),
881 mfunc = std::forward<F>(func)
883 auto ret = std::move(minitial);
884 for (auto& val : vals) {
885 ret = mfunc(std::move(ret), std::move(val));
891 // unorderedReduce (iterator)
893 template <class It, class T, class F, class ItT, class Arg>
894 Future<T> unorderedReduce(It first, It last, T initial, F func) {
896 return makeFuture(std::move(initial));
899 typedef isTry<Arg> IsTry;
901 struct UnorderedReduceContext {
902 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
903 : lock_(), memo_(makeFuture<T>(std::move(memo))),
904 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
906 folly::MicroSpinLock lock_; // protects memo_ and numThens_
909 size_t numThens_; // how many Futures completed and called .then()
910 size_t numFutures_; // how many Futures in total
914 auto ctx = std::make_shared<UnorderedReduceContext>(
915 std::move(initial), std::move(func), std::distance(first, last));
920 [ctx](size_t /* i */, Try<ItT>&& t) {
921 // Futures can be completed in any order, simultaneously.
922 // To make this non-blocking, we create a new Future chain in
923 // the order of completion to reduce the values.
924 // The spinlock just protects chaining a new Future, not actually
925 // executing the reduce, which should be really fast.
926 folly::MSLGuard lock(ctx->lock_);
928 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
929 // Either return a ItT&& or a Try<ItT>&& depending
930 // on the type of the argument of func.
931 return ctx->func_(std::move(v),
932 mt.template get<IsTry::value, Arg&&>());
934 if (++ctx->numThens_ == ctx->numFutures_) {
935 // After reducing the value of the last Future, fulfill the Promise
936 ctx->memo_.setCallback_(
937 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
941 return ctx->promise_.getFuture();
947 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
948 return within(dur, TimedOut(), tk);
953 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
956 Context(E ex) : exception(std::move(ex)), promise() {}
958 Future<Unit> thisFuture;
960 std::atomic<bool> token {false};
963 std::shared_ptr<Timekeeper> tks;
965 tks = folly::detail::getTimekeeperSingleton();
966 tk = DCHECK_NOTNULL(tks.get());
969 auto ctx = std::make_shared<Context>(std::move(e));
971 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
972 // TODO: "this" completed first, cancel "after"
973 if (ctx->token.exchange(true) == false) {
974 ctx->promise.setTry(std::move(t));
978 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
979 // "after" completed first, cancel "this"
980 ctx->thisFuture.raise(TimedOut());
981 if (ctx->token.exchange(true) == false) {
982 if (t.hasException()) {
983 ctx->promise.setException(std::move(t.exception()));
985 ctx->promise.setException(std::move(ctx->exception));
990 return ctx->promise.getFuture().via(getExecutor());
996 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
997 return collectAll(*this, futures::sleep(dur, tk))
998 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
999 Try<T>& t = std::get<0>(tup);
1000 return makeFuture<T>(std::move(t));
1007 void waitImpl(Future<T>& f) {
1008 // short-circuit if there's nothing to do
1009 if (f.isReady()) return;
1011 FutureBatonType baton;
1012 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1014 assert(f.isReady());
1018 void waitImpl(Future<T>& f, Duration dur) {
1019 // short-circuit if there's nothing to do
1025 auto ret = promise.getFuture();
1026 auto baton = std::make_shared<FutureBatonType>();
1027 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1028 promise.setTry(std::move(t));
1032 if (baton->timed_wait(dur)) {
1033 assert(f.isReady());
1038 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1039 // Set callback so to ensure that the via executor has something on it
1040 // so that once the preceding future triggers this callback, drive will
1041 // always have a callback to satisfy it
1044 f = f.via(e).then([](T&& t) { return std::move(t); });
1045 while (!f.isReady()) {
1048 assert(f.isReady());
1054 Future<T>& Future<T>::wait() & {
1055 detail::waitImpl(*this);
1060 Future<T>&& Future<T>::wait() && {
1061 detail::waitImpl(*this);
1062 return std::move(*this);
1066 Future<T>& Future<T>::wait(Duration dur) & {
1067 detail::waitImpl(*this, dur);
1072 Future<T>&& Future<T>::wait(Duration dur) && {
1073 detail::waitImpl(*this, dur);
1074 return std::move(*this);
1078 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1079 detail::waitViaImpl(*this, e);
1084 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1085 detail::waitViaImpl(*this, e);
1086 return std::move(*this);
1090 T Future<T>::get() {
1091 return std::move(wait().value());
1095 T Future<T>::get(Duration dur) {
1098 return std::move(value());
1105 T Future<T>::getVia(DrivableExecutor* e) {
1106 return std::move(waitVia(e).value());
1112 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1113 return t1.value() == t2.value();
1119 Future<bool> Future<T>::willEqual(Future<T>& f) {
1120 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1121 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1122 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1131 Future<T> Future<T>::filter(F&& predicate) {
1132 return this->then([p = std::forward<F>(predicate)](T val) {
1133 T const& valConstRef = val;
1134 if (!p(valConstRef)) {
1135 throw PredicateDoesNotObtain();
1142 template <class Callback>
1143 auto Future<T>::thenMulti(Callback&& fn)
1144 -> decltype(this->then(std::forward<Callback>(fn))) {
1145 // thenMulti with one callback is just a then
1146 return then(std::forward<Callback>(fn));
1150 template <class Callback, class... Callbacks>
1151 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1152 -> decltype(this->then(std::forward<Callback>(fn)).
1153 thenMulti(std::forward<Callbacks>(fns)...)) {
1154 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1155 return then(std::forward<Callback>(fn)).
1156 thenMulti(std::forward<Callbacks>(fns)...);
1160 template <class Callback, class... Callbacks>
1161 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1163 -> decltype(this->then(std::forward<Callback>(fn)).
1164 thenMulti(std::forward<Callbacks>(fns)...)) {
1165 // thenMultiExecutor with two callbacks is
1166 // via(x).then(a).thenMulti(b, ...).via(oldX)
1167 auto oldX = getExecutor();
1169 return then(std::forward<Callback>(fn)).
1170 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1174 template <class Callback>
1175 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1176 -> decltype(this->then(std::forward<Callback>(fn))) {
1177 // thenMulti with one callback is just a then with an executor
1178 return then(x, std::forward<Callback>(fn));
1182 inline Future<Unit> when(bool p, F&& thunk) {
1183 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1186 template <class P, class F>
1187 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1189 auto future = thunk();
1190 return future.then([
1191 predicate = std::forward<P>(predicate),
1192 thunk = std::forward<F>(thunk)
1194 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1197 return makeFuture();
1201 Future<Unit> times(const int n, F&& thunk) {
1202 return folly::whileDo(
1203 [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
1204 return count->fetch_add(1) < n;
1206 std::forward<F>(thunk));
1210 template <class It, class F, class ItT, class Result>
1211 std::vector<Future<Result>> map(It first, It last, F func) {
1212 std::vector<Future<Result>> results;
1213 for (auto it = first; it != last; it++) {
1214 results.push_back(it->then(func));
1224 struct retrying_policy_raw_tag {};
1225 struct retrying_policy_fut_tag {};
1227 template <class Policy>
1228 struct retrying_policy_traits {
1229 using ew = exception_wrapper;
1230 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1231 template <class Ret>
1232 using has_op = typename std::integral_constant<bool,
1233 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1234 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1235 using is_raw = has_op<bool>;
1236 using is_fut = has_op<Future<bool>>;
1237 using tag = typename std::conditional<
1238 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1239 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1242 template <class Policy, class FF>
1243 typename std::result_of<FF(size_t)>::type
1244 retrying(size_t k, Policy&& p, FF&& ff) {
1245 using F = typename std::result_of<FF(size_t)>::type;
1246 using T = typename F::value_type;
1249 [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
1250 exception_wrapper x) mutable {
1253 [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
1255 return r ? retrying(k, std::move(pm), std::move(ffm))
1256 : makeFuture<T>(std::move(xm));
1261 template <class Policy, class FF>
1262 typename std::result_of<FF(size_t)>::type
1263 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1264 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1265 return makeFuture<bool>(pm(k, x));
1267 return retrying(0, std::move(q), std::forward<FF>(ff));
1270 template <class Policy, class FF>
1271 typename std::result_of<FF(size_t)>::type
1272 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1273 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1276 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1277 template <class URNG>
1278 Duration retryingJitteredExponentialBackoffDur(
1280 Duration backoff_min,
1281 Duration backoff_max,
1282 double jitter_param,
1285 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1286 auto jitter = std::exp(dist(rng));
1287 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1288 return std::max(backoff_min, std::min(backoff_max, backoff));
1291 template <class Policy, class URNG>
1292 std::function<Future<bool>(size_t, const exception_wrapper&)>
1293 retryingPolicyCappedJitteredExponentialBackoff(
1295 Duration backoff_min,
1296 Duration backoff_max,
1297 double jitter_param,
1301 pm = std::forward<Policy>(p),
1306 rngp = std::forward<URNG>(rng)
1307 ](size_t n, const exception_wrapper& ex) mutable {
1308 if (n == max_tries) {
1309 return makeFuture(false);
1311 return pm(n, ex).then(
1312 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1315 return makeFuture(false);
1317 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1318 n, backoff_min, backoff_max, jitter_param, rngp);
1319 return futures::sleep(backoff).then([] { return true; });
1324 template <class Policy, class URNG>
1325 std::function<Future<bool>(size_t, const exception_wrapper&)>
1326 retryingPolicyCappedJitteredExponentialBackoff(
1328 Duration backoff_min,
1329 Duration backoff_max,
1330 double jitter_param,
1333 retrying_policy_raw_tag) {
1334 auto q = [pm = std::forward<Policy>(p)](
1335 size_t n, const exception_wrapper& e) {
1336 return makeFuture(pm(n, e));
1338 return retryingPolicyCappedJitteredExponentialBackoff(
1343 std::forward<URNG>(rng),
1347 template <class Policy, class URNG>
1348 std::function<Future<bool>(size_t, const exception_wrapper&)>
1349 retryingPolicyCappedJitteredExponentialBackoff(
1351 Duration backoff_min,
1352 Duration backoff_max,
1353 double jitter_param,
1356 retrying_policy_fut_tag) {
1357 return retryingPolicyCappedJitteredExponentialBackoff(
1362 std::forward<URNG>(rng),
1363 std::forward<Policy>(p));
1367 template <class Policy, class FF>
1368 typename std::result_of<FF(size_t)>::type
1369 retrying(Policy&& p, FF&& ff) {
1370 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1371 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1375 std::function<bool(size_t, const exception_wrapper&)>
1376 retryingPolicyBasic(
1378 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1381 template <class Policy, class URNG>
1382 std::function<Future<bool>(size_t, const exception_wrapper&)>
1383 retryingPolicyCappedJitteredExponentialBackoff(
1385 Duration backoff_min,
1386 Duration backoff_max,
1387 double jitter_param,
1390 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1391 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1396 std::forward<URNG>(rng),
1397 std::forward<Policy>(p),
1402 std::function<Future<bool>(size_t, const exception_wrapper&)>
1403 retryingPolicyCappedJitteredExponentialBackoff(
1405 Duration backoff_min,
1406 Duration backoff_max,
1407 double jitter_param) {
1408 auto p = [](size_t, const exception_wrapper&) { return true; };
1409 return retryingPolicyCappedJitteredExponentialBackoff(
1420 // Instantiate the most common Future types to save compile time
1421 extern template class Future<Unit>;
1422 extern template class Future<bool>;
1423 extern template class Future<int>;
1424 extern template class Future<int64_t>;
1425 extern template class Future<std::string>;
1426 extern template class Future<double>;
1428 } // namespace folly