2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if defined(__ANDROID__) || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/experimental/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
71 template <typename, typename>
73 : core_(new detail::Core<T>(Try<T>(T()))) {}
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::move(func));
105 typename std::enable_if<isFuture<F>::value,
106 Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108 return then([](Future<typename isFuture<T>::Inner> internal_future) {
109 return internal_future;
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
121 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122 typedef typename R::ReturnsFuture::Inner B;
126 // wrap these so we can move them into the lambda
127 folly::MoveWrapper<Promise<B>> p;
128 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
129 folly::MoveWrapper<F> funcm(std::forward<F>(func));
131 // grab the Future now before we lose our handle on the Promise
132 auto f = p->getFuture();
133 f.core_->setExecutorNoLock(getExecutor());
135 /* This is a bit tricky.
137 We can't just close over *this in case this Future gets moved. So we
138 make a new dummy Future. We could figure out something more
139 sophisticated that avoids making a new Future object when it can, as an
140 optimization. But this is correct.
142 core_ can't be moved, it is explicitly disallowed (as is copying). But
143 if there's ever a reason to allow it, this is one place that makes that
144 assumption and would need to be fixed. We use a standard shared pointer
145 for core_ (by copying it in), which means in essence obj holds a shared
146 pointer to itself. But this shouldn't leak because Promise will not
147 outlive the continuation, because Promise will setException() with a
148 broken Promise if it is destructed before completed. We could use a
149 weak pointer but it would have to be converted to a shared pointer when
150 func is executed (because the Future returned by func may possibly
151 persist beyond the callback, if it gets moved), and so it is an
152 optimization to just make it shared from the get-go.
154 We have to move in the Promise and func using the MoveWrapper
155 hack. (func could be copied but it's a big drag on perf).
157 Two subtle but important points about this design. detail::Core has no
158 back pointers to Future or Promise, so if Future or Promise get moved
159 (and they will be moved in performant code) we don't have to do
160 anything fancy. And because we store the continuation in the
161 detail::Core, not in the Future, we can execute the continuation even
162 after the Future has gone out of scope. This is an intentional design
163 decision. It is likely we will want to be able to cancel a continuation
164 in some circumstances, but I think it should be explicit not implicit
165 in the destruction of the Future used to create it.
168 [p, funcm](Try<T>&& t) mutable {
169 if (!isTry && t.hasException()) {
170 p->setException(std::move(t.exception()));
173 return (*funcm)(t.template get<isTry, Args>()...);
181 // Variant: returns a Future
182 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
184 template <typename F, typename R, bool isTry, typename... Args>
185 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
186 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
187 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
188 typedef typename R::ReturnsFuture::Inner B;
192 // wrap these so we can move them into the lambda
193 folly::MoveWrapper<Promise<B>> p;
194 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
195 folly::MoveWrapper<F> funcm(std::forward<F>(func));
197 // grab the Future now before we lose our handle on the Promise
198 auto f = p->getFuture();
199 f.core_->setExecutorNoLock(getExecutor());
202 [p, funcm](Try<T>&& t) mutable {
203 if (!isTry && t.hasException()) {
204 p->setException(std::move(t.exception()));
207 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
208 // that didn't throw, now we can steal p
209 f2.setCallback_([p](Try<B>&& b) mutable {
210 p->setTry(std::move(b));
212 } catch (const std::exception& e) {
213 p->setException(exception_wrapper(std::current_exception(), e));
215 p->setException(exception_wrapper(std::current_exception()));
223 template <typename T>
224 template <typename R, typename Caller, typename... Args>
225 Future<typename isFuture<R>::Inner>
226 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
227 typedef typename std::remove_cv<
228 typename std::remove_reference<
229 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
230 return then([instance, func](Try<T>&& t){
231 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
236 template <class Executor, class Arg, class... Args>
237 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
238 -> decltype(this->then(std::forward<Arg>(arg),
239 std::forward<Args>(args)...))
241 auto oldX = getExecutor();
243 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
248 Future<Unit> Future<T>::then() {
249 return then([] () {});
252 // onError where the callback returns T
255 typename std::enable_if<
256 !detail::callableWith<F, exception_wrapper>::value &&
257 !detail::Extract<F>::ReturnsFuture::value,
259 Future<T>::onError(F&& func) {
260 typedef typename detail::Extract<F>::FirstArg Exn;
262 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
263 "Return type of onError callback must be T or Future<T>");
266 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
267 auto f = p.getFuture();
268 auto pm = folly::makeMoveWrapper(std::move(p));
269 auto funcm = folly::makeMoveWrapper(std::move(func));
270 setCallback_([pm, funcm](Try<T>&& t) mutable {
271 if (!t.template withException<Exn>([&] (Exn& e) {
276 pm->setTry(std::move(t));
283 // onError where the callback returns Future<T>
286 typename std::enable_if<
287 !detail::callableWith<F, exception_wrapper>::value &&
288 detail::Extract<F>::ReturnsFuture::value,
290 Future<T>::onError(F&& func) {
292 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
293 "Return type of onError callback must be T or Future<T>");
294 typedef typename detail::Extract<F>::FirstArg Exn;
297 auto f = p.getFuture();
298 auto pm = folly::makeMoveWrapper(std::move(p));
299 auto funcm = folly::makeMoveWrapper(std::move(func));
300 setCallback_([pm, funcm](Try<T>&& t) mutable {
301 if (!t.template withException<Exn>([&] (Exn& e) {
303 auto f2 = (*funcm)(e);
304 f2.setCallback_([pm](Try<T>&& t2) mutable {
305 pm->setTry(std::move(t2));
307 } catch (const std::exception& e2) {
308 pm->setException(exception_wrapper(std::current_exception(), e2));
310 pm->setException(exception_wrapper(std::current_exception()));
313 pm->setTry(std::move(t));
322 Future<T> Future<T>::ensure(F func) {
323 MoveWrapper<F> funcw(std::move(func));
324 return this->then([funcw](Try<T>&& t) mutable {
326 return makeFuture(std::move(t));
332 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
333 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
334 return within(dur, tk)
335 .onError([funcw](TimedOut const&) { return (*funcw)(); });
340 typename std::enable_if<
341 detail::callableWith<F, exception_wrapper>::value &&
342 detail::Extract<F>::ReturnsFuture::value,
344 Future<T>::onError(F&& func) {
346 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
347 "Return type of onError callback must be T or Future<T>");
350 auto f = p.getFuture();
351 auto pm = folly::makeMoveWrapper(std::move(p));
352 auto funcm = folly::makeMoveWrapper(std::move(func));
353 setCallback_([pm, funcm](Try<T> t) mutable {
354 if (t.hasException()) {
356 auto f2 = (*funcm)(std::move(t.exception()));
357 f2.setCallback_([pm](Try<T> t2) mutable {
358 pm->setTry(std::move(t2));
360 } catch (const std::exception& e2) {
361 pm->setException(exception_wrapper(std::current_exception(), e2));
363 pm->setException(exception_wrapper(std::current_exception()));
366 pm->setTry(std::move(t));
373 // onError(exception_wrapper) that returns T
376 typename std::enable_if<
377 detail::callableWith<F, exception_wrapper>::value &&
378 !detail::Extract<F>::ReturnsFuture::value,
380 Future<T>::onError(F&& func) {
382 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
383 "Return type of onError callback must be T or Future<T>");
386 auto f = p.getFuture();
387 auto pm = folly::makeMoveWrapper(std::move(p));
388 auto funcm = folly::makeMoveWrapper(std::move(func));
389 setCallback_([pm, funcm](Try<T> t) mutable {
390 if (t.hasException()) {
392 return (*funcm)(std::move(t.exception()));
395 pm->setTry(std::move(t));
403 typename std::add_lvalue_reference<T>::type Future<T>::value() {
406 return core_->getTry().value();
410 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
413 return core_->getTry().value();
417 Try<T>& Future<T>::getTry() {
420 return core_->getTry();
424 Optional<Try<T>> Future<T>::poll() {
426 if (core_->ready()) {
427 o = std::move(core_->getTry());
433 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
436 setExecutor(executor, priority);
438 return std::move(*this);
442 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
445 MoveWrapper<Promise<T>> p;
446 auto f = p->getFuture();
447 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
448 return std::move(f).via(executor, priority);
452 template <class Func>
453 auto via(Executor* x, Func func)
454 -> Future<typename isFuture<decltype(func())>::Inner>
456 // TODO make this actually more performant. :-P #7260175
457 return via(x).then(func);
461 bool Future<T>::isReady() const {
463 return core_->ready();
467 bool Future<T>::hasValue() {
468 return getTry().hasValue();
472 bool Future<T>::hasException() {
473 return getTry().hasException();
477 void Future<T>::raise(exception_wrapper exception) {
478 core_->raise(std::move(exception));
484 Future<typename std::decay<T>::type> makeFuture(T&& t) {
485 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
488 inline // for multiple translation units
489 Future<Unit> makeFuture() {
490 return makeFuture(Unit{});
493 // makeFutureWith(Future<T>()) -> Future<T>
495 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
496 typename std::result_of<F()>::type>::type
497 makeFutureWith(F&& func) {
499 typename isFuture<typename std::result_of<F()>::type>::Inner;
502 } catch (std::exception& e) {
503 return makeFuture<InnerType>(
504 exception_wrapper(std::current_exception(), e));
506 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
510 // makeFutureWith(T()) -> Future<T>
511 // makeFutureWith(void()) -> Future<Unit>
513 typename std::enable_if<
514 !(isFuture<typename std::result_of<F()>::type>::value),
515 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
516 makeFutureWith(F&& func) {
518 typename Unit::Lift<typename std::result_of<F()>::type>::type;
519 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
525 Future<T> makeFuture(std::exception_ptr const& e) {
526 return makeFuture(Try<T>(e));
530 Future<T> makeFuture(exception_wrapper ew) {
531 return makeFuture(Try<T>(std::move(ew)));
534 template <class T, class E>
535 typename std::enable_if<std::is_base_of<std::exception, E>::value,
537 makeFuture(E const& e) {
538 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
542 Future<T> makeFuture(Try<T>&& t) {
543 return Future<T>(new detail::Core<T>(std::move(t)));
547 Future<Unit> via(Executor* executor, int8_t priority) {
548 return makeFuture().via(executor, priority);
551 // mapSetCallback calls func(i, Try<T>) when every future completes
553 template <class T, class InputIterator, class F>
554 void mapSetCallback(InputIterator first, InputIterator last, F func) {
555 for (size_t i = 0; first != last; ++first, ++i) {
556 first->setCallback_([func, i](Try<T>&& t) {
557 func(i, std::move(t));
562 // collectAll (variadic)
564 template <typename... Fs>
565 typename detail::CollectAllVariadicContext<
566 typename std::decay<Fs>::type::value_type...>::type
567 collectAll(Fs&&... fs) {
568 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
569 typename std::decay<Fs>::type::value_type...>>();
570 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
571 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
572 return ctx->p.getFuture();
575 // collectAll (iterator)
577 template <class InputIterator>
580 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
581 collectAll(InputIterator first, InputIterator last) {
583 typename std::iterator_traits<InputIterator>::value_type::value_type T;
585 struct CollectAllContext {
586 CollectAllContext(int n) : results(n) {}
587 ~CollectAllContext() {
588 p.setValue(std::move(results));
590 Promise<std::vector<Try<T>>> p;
591 std::vector<Try<T>> results;
594 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
595 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
596 ctx->results[i] = std::move(t);
598 return ctx->p.getFuture();
601 // collect (iterator)
605 template <typename T>
606 struct CollectContext {
607 struct Nothing { explicit Nothing(int n) {} };
609 using Result = typename std::conditional<
610 std::is_void<T>::value,
612 std::vector<T>>::type;
614 using InternalResult = typename std::conditional<
615 std::is_void<T>::value,
617 std::vector<Optional<T>>>::type;
619 explicit CollectContext(int n) : result(n) {}
621 if (!threw.exchange(true)) {
622 // map Optional<T> -> T
623 std::vector<T> finalResult;
624 finalResult.reserve(result.size());
625 std::transform(result.begin(), result.end(),
626 std::back_inserter(finalResult),
627 [](Optional<T>& o) { return std::move(o.value()); });
628 p.setValue(std::move(finalResult));
631 inline void setPartialResult(size_t i, Try<T>& t) {
632 result[i] = std::move(t.value());
635 InternalResult result;
636 std::atomic<bool> threw {false};
641 template <class InputIterator>
642 Future<typename detail::CollectContext<
643 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
644 collect(InputIterator first, InputIterator last) {
646 typename std::iterator_traits<InputIterator>::value_type::value_type T;
648 auto ctx = std::make_shared<detail::CollectContext<T>>(
649 std::distance(first, last));
650 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
651 if (t.hasException()) {
652 if (!ctx->threw.exchange(true)) {
653 ctx->p.setException(std::move(t.exception()));
655 } else if (!ctx->threw) {
656 ctx->setPartialResult(i, t);
659 return ctx->p.getFuture();
662 // collect (variadic)
664 template <typename... Fs>
665 typename detail::CollectVariadicContext<
666 typename std::decay<Fs>::type::value_type...>::type
667 collect(Fs&&... fs) {
668 auto ctx = std::make_shared<detail::CollectVariadicContext<
669 typename std::decay<Fs>::type::value_type...>>();
670 detail::collectVariadicHelper<detail::CollectVariadicContext>(
671 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
672 return ctx->p.getFuture();
675 // collectAny (iterator)
677 template <class InputIterator>
682 std::iterator_traits<InputIterator>::value_type::value_type>>>
683 collectAny(InputIterator first, InputIterator last) {
685 typename std::iterator_traits<InputIterator>::value_type::value_type T;
687 struct CollectAnyContext {
688 CollectAnyContext() {};
689 Promise<std::pair<size_t, Try<T>>> p;
690 std::atomic<bool> done {false};
693 auto ctx = std::make_shared<CollectAnyContext>();
694 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
695 if (!ctx->done.exchange(true)) {
696 ctx->p.setValue(std::make_pair(i, std::move(t)));
699 return ctx->p.getFuture();
702 // collectN (iterator)
704 template <class InputIterator>
705 Future<std::vector<std::pair<size_t, Try<typename
706 std::iterator_traits<InputIterator>::value_type::value_type>>>>
707 collectN(InputIterator first, InputIterator last, size_t n) {
709 std::iterator_traits<InputIterator>::value_type::value_type T;
710 typedef std::vector<std::pair<size_t, Try<T>>> V;
712 struct CollectNContext {
714 std::atomic<size_t> completed = {0};
717 auto ctx = std::make_shared<CollectNContext>();
719 if (size_t(std::distance(first, last)) < n) {
720 ctx->p.setException(std::runtime_error("Not enough futures"));
722 // for each completed Future, increase count and add to vector, until we
723 // have n completed futures at which point we fulfil our Promise with the
725 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
726 auto c = ++ctx->completed;
728 assert(ctx->v.size() < n);
729 ctx->v.emplace_back(i, std::move(t));
731 ctx->p.setTry(Try<V>(std::move(ctx->v)));
737 return ctx->p.getFuture();
742 template <class It, class T, class F>
743 Future<T> reduce(It first, It last, T&& initial, F&& func) {
745 return makeFuture(std::move(initial));
748 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
749 typedef typename std::conditional<
750 detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
751 typedef isTry<Arg> IsTry;
753 folly::MoveWrapper<T> minitial(std::move(initial));
754 auto sfunc = std::make_shared<F>(std::move(func));
756 auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
757 return (*sfunc)(std::move(*minitial),
758 head.template get<IsTry::value, Arg&&>());
761 for (++first; first != last; ++first) {
762 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
763 return (*sfunc)(std::move(std::get<0>(t).value()),
764 // Either return a ItT&& or a Try<ItT>&& depending
765 // on the type of the argument of func.
766 std::get<1>(t).template get<IsTry::value, Arg&&>());
773 // window (collection)
775 template <class Collection, class F, class ItT, class Result>
776 std::vector<Future<Result>>
777 window(Collection input, F func, size_t n) {
778 struct WindowContext {
779 WindowContext(Collection&& i, F&& fn)
780 : input_(std::move(i)), promises_(input_.size()),
783 std::atomic<size_t> i_ {0};
785 std::vector<Promise<Result>> promises_;
788 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
789 size_t i = ctx->i_++;
790 if (i < ctx->input_.size()) {
791 // Using setCallback_ directly since we don't need the Future
792 ctx->func_(std::move(ctx->input_[i])).setCallback_(
793 // ctx is captured by value
794 [ctx, i](Try<Result>&& t) {
795 ctx->promises_[i].setTry(std::move(t));
796 // Chain another future onto this one
797 spawn(std::move(ctx));
803 auto max = std::min(n, input.size());
805 auto ctx = std::make_shared<WindowContext>(
806 std::move(input), std::move(func));
808 for (size_t i = 0; i < max; ++i) {
809 // Start the first n Futures
810 WindowContext::spawn(ctx);
813 std::vector<Future<Result>> futures;
814 futures.reserve(ctx->promises_.size());
815 for (auto& promise : ctx->promises_) {
816 futures.emplace_back(promise.getFuture());
825 template <class I, class F>
826 Future<I> Future<T>::reduce(I&& initial, F&& func) {
827 folly::MoveWrapper<I> minitial(std::move(initial));
828 folly::MoveWrapper<F> mfunc(std::move(func));
829 return then([minitial, mfunc](T& vals) mutable {
830 auto ret = std::move(*minitial);
831 for (auto& val : vals) {
832 ret = (*mfunc)(std::move(ret), std::move(val));
838 // unorderedReduce (iterator)
840 template <class It, class T, class F, class ItT, class Arg>
841 Future<T> unorderedReduce(It first, It last, T initial, F func) {
843 return makeFuture(std::move(initial));
846 typedef isTry<Arg> IsTry;
848 struct UnorderedReduceContext {
849 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
850 : lock_(), memo_(makeFuture<T>(std::move(memo))),
851 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
853 folly::MicroSpinLock lock_; // protects memo_ and numThens_
856 size_t numThens_; // how many Futures completed and called .then()
857 size_t numFutures_; // how many Futures in total
861 auto ctx = std::make_shared<UnorderedReduceContext>(
862 std::move(initial), std::move(func), std::distance(first, last));
864 mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
865 folly::MoveWrapper<Try<ItT>> mt(std::move(t));
866 // Futures can be completed in any order, simultaneously.
867 // To make this non-blocking, we create a new Future chain in
868 // the order of completion to reduce the values.
869 // The spinlock just protects chaining a new Future, not actually
870 // executing the reduce, which should be really fast.
871 folly::MSLGuard lock(ctx->lock_);
872 ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
873 // Either return a ItT&& or a Try<ItT>&& depending
874 // on the type of the argument of func.
875 return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
877 if (++ctx->numThens_ == ctx->numFutures_) {
878 // After reducing the value of the last Future, fulfill the Promise
879 ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
880 ctx->promise_.setValue(std::move(t2));
885 return ctx->promise_.getFuture();
891 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
892 return within(dur, TimedOut(), tk);
897 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
900 Context(E ex) : exception(std::move(ex)), promise() {}
902 Future<Unit> thisFuture;
904 std::atomic<bool> token {false};
907 std::shared_ptr<Timekeeper> tks;
909 tks = folly::detail::getTimekeeperSingleton();
910 tk = DCHECK_NOTNULL(tks.get());
913 auto ctx = std::make_shared<Context>(std::move(e));
915 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
916 // TODO: "this" completed first, cancel "after"
917 if (ctx->token.exchange(true) == false) {
918 ctx->promise.setTry(std::move(t));
922 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
923 // "after" completed first, cancel "this"
924 ctx->thisFuture.raise(TimedOut());
925 if (ctx->token.exchange(true) == false) {
926 if (t.hasException()) {
927 ctx->promise.setException(std::move(t.exception()));
929 ctx->promise.setException(std::move(ctx->exception));
934 return ctx->promise.getFuture().via(getExecutor());
940 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
941 return collectAll(*this, futures::sleep(dur, tk))
942 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
943 Try<T>& t = std::get<0>(tup);
944 return makeFuture<T>(std::move(t));
951 void waitImpl(Future<T>& f) {
952 // short-circuit if there's nothing to do
953 if (f.isReady()) return;
955 FutureBatonType baton;
956 f.setCallback_([&](const Try<T>& t) { baton.post(); });
962 void waitImpl(Future<T>& f, Duration dur) {
963 // short-circuit if there's nothing to do
964 if (f.isReady()) return;
966 folly::MoveWrapper<Promise<T>> promise;
967 auto ret = promise->getFuture();
968 auto baton = std::make_shared<FutureBatonType>();
969 f.setCallback_([baton, promise](Try<T>&& t) mutable {
970 promise->setTry(std::move(t));
974 if (baton->timed_wait(dur)) {
980 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
981 while (!f.isReady()) {
989 Future<T>& Future<T>::wait() & {
990 detail::waitImpl(*this);
995 Future<T>&& Future<T>::wait() && {
996 detail::waitImpl(*this);
997 return std::move(*this);
1001 Future<T>& Future<T>::wait(Duration dur) & {
1002 detail::waitImpl(*this, dur);
1007 Future<T>&& Future<T>::wait(Duration dur) && {
1008 detail::waitImpl(*this, dur);
1009 return std::move(*this);
1013 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1014 detail::waitViaImpl(*this, e);
1019 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1020 detail::waitViaImpl(*this, e);
1021 return std::move(*this);
1025 T Future<T>::get() {
1026 return std::move(wait().value());
1030 T Future<T>::get(Duration dur) {
1033 return std::move(value());
1040 T Future<T>::getVia(DrivableExecutor* e) {
1041 return std::move(waitVia(e).value());
1047 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1048 return t1.value() == t2.value();
1054 Future<bool> Future<T>::willEqual(Future<T>& f) {
1055 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1056 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1057 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1066 Future<T> Future<T>::filter(F predicate) {
1067 auto p = folly::makeMoveWrapper(std::move(predicate));
1068 return this->then([p](T val) {
1069 T const& valConstRef = val;
1070 if (!(*p)(valConstRef)) {
1071 throw PredicateDoesNotObtain();
1078 template <class Callback>
1079 auto Future<T>::thenMulti(Callback&& fn)
1080 -> decltype(this->then(std::forward<Callback>(fn))) {
1081 // thenMulti with one callback is just a then
1082 return then(std::forward<Callback>(fn));
1086 template <class Callback, class... Callbacks>
1087 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1088 -> decltype(this->then(std::forward<Callback>(fn)).
1089 thenMulti(std::forward<Callbacks>(fns)...)) {
1090 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1091 return then(std::forward<Callback>(fn)).
1092 thenMulti(std::forward<Callbacks>(fns)...);
1096 template <class Callback, class... Callbacks>
1097 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1099 -> decltype(this->then(std::forward<Callback>(fn)).
1100 thenMulti(std::forward<Callbacks>(fns)...)) {
1101 // thenMultiExecutor with two callbacks is
1102 // via(x).then(a).thenMulti(b, ...).via(oldX)
1103 auto oldX = getExecutor();
1105 return then(std::forward<Callback>(fn)).
1106 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1110 template <class Callback>
1111 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1112 -> decltype(this->then(std::forward<Callback>(fn))) {
1113 // thenMulti with one callback is just a then with an executor
1114 return then(x, std::forward<Callback>(fn));
1118 inline Future<Unit> when(bool p, F thunk) {
1119 return p ? thunk().unit() : makeFuture();
1122 template <class P, class F>
1123 Future<Unit> whileDo(P predicate, F thunk) {
1125 return thunk().then([=] {
1126 return whileDo(predicate, thunk);
1129 return makeFuture();
1133 Future<Unit> times(const int n, F thunk) {
1134 auto count = folly::makeMoveWrapper(
1135 std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1137 return folly::whileDo([=]() mutable {
1138 return (*count)->fetch_add(1) < n;
1143 template <class It, class F, class ItT, class Result>
1144 std::vector<Future<Result>> map(It first, It last, F func) {
1145 std::vector<Future<Result>> results;
1146 for (auto it = first; it != last; it++) {
1147 results.push_back(it->then(func));
1157 struct retrying_policy_raw_tag {};
1158 struct retrying_policy_fut_tag {};
1160 template <class Policy>
1161 struct retrying_policy_traits {
1162 using ew = exception_wrapper;
1163 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1164 template <class Ret>
1165 using has_op = typename std::integral_constant<bool,
1166 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1167 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1168 using is_raw = has_op<bool>;
1169 using is_fut = has_op<Future<bool>>;
1170 using tag = typename std::conditional<
1171 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1172 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1175 template <class Policy, class FF>
1176 typename std::result_of<FF(size_t)>::type
1177 retrying(size_t k, Policy&& p, FF&& ff) {
1178 using F = typename std::result_of<FF(size_t)>::type;
1179 using T = typename F::value_type;
1181 auto pm = makeMoveWrapper(p);
1182 auto ffm = makeMoveWrapper(ff);
1183 return f.onError([=](exception_wrapper x) mutable {
1184 auto q = (*pm)(k, x);
1185 auto xm = makeMoveWrapper(std::move(x));
1186 return q.then([=](bool r) mutable {
1188 ? retrying(k, pm.move(), ffm.move())
1189 : makeFuture<T>(xm.move());
1194 template <class Policy, class FF>
1195 typename std::result_of<FF(size_t)>::type
1196 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1197 auto pm = makeMoveWrapper(std::move(p));
1198 auto q = [=](size_t k, exception_wrapper x) {
1199 return makeFuture<bool>((*pm)(k, x));
1201 return retrying(0, std::move(q), std::forward<FF>(ff));
1204 template <class Policy, class FF>
1205 typename std::result_of<FF(size_t)>::type
1206 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1207 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1210 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1211 template <class URNG>
1212 Duration retryingJitteredExponentialBackoffDur(
1214 Duration backoff_min,
1215 Duration backoff_max,
1216 double jitter_param,
1219 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1220 auto jitter = std::exp(dist(rng));
1221 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1222 return std::max(backoff_min, std::min(backoff_max, backoff));
1225 template <class Policy, class URNG>
1226 std::function<Future<bool>(size_t, const exception_wrapper&)>
1227 retryingPolicyCappedJitteredExponentialBackoff(
1229 Duration backoff_min,
1230 Duration backoff_max,
1231 double jitter_param,
1234 auto pm = makeMoveWrapper(std::move(p));
1235 auto rngp = std::make_shared<URNG>(std::move(rng));
1236 return [=](size_t n, const exception_wrapper& ex) mutable {
1237 if (n == max_tries) { return makeFuture(false); }
1238 return (*pm)(n, ex).then([=](bool v) {
1239 if (!v) { return makeFuture(false); }
1240 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1241 n, backoff_min, backoff_max, jitter_param, *rngp);
1242 return futures::sleep(backoff).then([] { return true; });
1247 template <class Policy, class URNG>
1248 std::function<Future<bool>(size_t, const exception_wrapper&)>
1249 retryingPolicyCappedJitteredExponentialBackoff(
1251 Duration backoff_min,
1252 Duration backoff_max,
1253 double jitter_param,
1256 retrying_policy_raw_tag) {
1257 auto pm = makeMoveWrapper(std::move(p));
1258 auto q = [=](size_t n, const exception_wrapper& e) {
1259 return makeFuture((*pm)(n, e));
1261 return retryingPolicyCappedJitteredExponentialBackoff(
1270 template <class Policy, class URNG>
1271 std::function<Future<bool>(size_t, const exception_wrapper&)>
1272 retryingPolicyCappedJitteredExponentialBackoff(
1274 Duration backoff_min,
1275 Duration backoff_max,
1276 double jitter_param,
1279 retrying_policy_fut_tag) {
1280 return retryingPolicyCappedJitteredExponentialBackoff(
1291 template <class Policy, class FF>
1292 typename std::result_of<FF(size_t)>::type
1293 retrying(Policy&& p, FF&& ff) {
1294 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1295 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1299 std::function<bool(size_t, const exception_wrapper&)>
1300 retryingPolicyBasic(
1302 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1305 template <class Policy, class URNG>
1306 std::function<Future<bool>(size_t, const exception_wrapper&)>
1307 retryingPolicyCappedJitteredExponentialBackoff(
1309 Duration backoff_min,
1310 Duration backoff_max,
1311 double jitter_param,
1314 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1315 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1326 std::function<Future<bool>(size_t, const exception_wrapper&)>
1327 retryingPolicyCappedJitteredExponentialBackoff(
1329 Duration backoff_min,
1330 Duration backoff_max,
1331 double jitter_param) {
1332 auto p = [](size_t, const exception_wrapper&) { return true; };
1333 return retryingPolicyCappedJitteredExponentialBackoff(
1344 // Instantiate the most common Future types to save compile time
1345 extern template class Future<Unit>;
1346 extern template class Future<bool>;
1347 extern template class Future<int>;
1348 extern template class Future<int64_t>;
1349 extern template class Future<std::string>;
1350 extern template class Future<double>;
1352 } // namespace folly