2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
36 Timekeeper* getTimekeeperSingleton();
40 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
41 other.core_ = nullptr;
45 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
46 std::swap(core_, other.core_);
51 template <class T2, typename>
52 Future<T>::Future(T2&& val)
53 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
56 template <typename, typename>
58 : core_(new detail::Core<T>(Try<T>(T()))) {}
61 Future<T>::~Future() {
66 void Future<T>::detach() {
68 core_->detachFuture();
74 void Future<T>::throwIfInvalid() const {
81 void Future<T>::setCallback_(F&& func) {
83 core_->setCallback(std::move(func));
90 typename std::enable_if<isFuture<F>::value,
91 Future<typename isFuture<T>::Inner>>::type
93 return then([](Future<typename isFuture<T>::Inner> internal_future) {
94 return internal_future;
100 // Variant: returns a value
101 // e.g. f.then([](Try<T>&& t){ return t.value(); });
103 template <typename F, typename R, bool isTry, typename... Args>
104 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
105 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
106 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
107 typedef typename R::ReturnsFuture::Inner B;
111 // wrap these so we can move them into the lambda
112 folly::MoveWrapper<Promise<B>> p;
113 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
114 folly::MoveWrapper<F> funcm(std::forward<F>(func));
116 // grab the Future now before we lose our handle on the Promise
117 auto f = p->getFuture();
118 f.core_->setExecutorNoLock(getExecutor());
120 /* This is a bit tricky.
122 We can't just close over *this in case this Future gets moved. So we
123 make a new dummy Future. We could figure out something more
124 sophisticated that avoids making a new Future object when it can, as an
125 optimization. But this is correct.
127 core_ can't be moved, it is explicitly disallowed (as is copying). But
128 if there's ever a reason to allow it, this is one place that makes that
129 assumption and would need to be fixed. We use a standard shared pointer
130 for core_ (by copying it in), which means in essence obj holds a shared
131 pointer to itself. But this shouldn't leak because Promise will not
132 outlive the continuation, because Promise will setException() with a
133 broken Promise if it is destructed before completed. We could use a
134 weak pointer but it would have to be converted to a shared pointer when
135 func is executed (because the Future returned by func may possibly
136 persist beyond the callback, if it gets moved), and so it is an
137 optimization to just make it shared from the get-go.
139 We have to move in the Promise and func using the MoveWrapper
140 hack. (func could be copied but it's a big drag on perf).
142 Two subtle but important points about this design. detail::Core has no
143 back pointers to Future or Promise, so if Future or Promise get moved
144 (and they will be moved in performant code) we don't have to do
145 anything fancy. And because we store the continuation in the
146 detail::Core, not in the Future, we can execute the continuation even
147 after the Future has gone out of scope. This is an intentional design
148 decision. It is likely we will want to be able to cancel a continuation
149 in some circumstances, but I think it should be explicit not implicit
150 in the destruction of the Future used to create it.
153 [p, funcm](Try<T>&& t) mutable {
154 if (!isTry && t.hasException()) {
155 p->setException(std::move(t.exception()));
158 return (*funcm)(t.template get<isTry, Args>()...);
166 // Variant: returns a Future
167 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
169 template <typename F, typename R, bool isTry, typename... Args>
170 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
171 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
172 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
173 typedef typename R::ReturnsFuture::Inner B;
177 // wrap these so we can move them into the lambda
178 folly::MoveWrapper<Promise<B>> p;
179 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
180 folly::MoveWrapper<F> funcm(std::forward<F>(func));
182 // grab the Future now before we lose our handle on the Promise
183 auto f = p->getFuture();
184 f.core_->setExecutorNoLock(getExecutor());
187 [p, funcm](Try<T>&& t) mutable {
188 if (!isTry && t.hasException()) {
189 p->setException(std::move(t.exception()));
192 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
193 // that didn't throw, now we can steal p
194 f2.setCallback_([p](Try<B>&& b) mutable {
195 p->setTry(std::move(b));
197 } catch (const std::exception& e) {
198 p->setException(exception_wrapper(std::current_exception(), e));
200 p->setException(exception_wrapper(std::current_exception()));
208 template <typename T>
209 template <typename R, typename Caller, typename... Args>
210 Future<typename isFuture<R>::Inner>
211 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
212 typedef typename std::remove_cv<
213 typename std::remove_reference<
214 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
215 return then([instance, func](Try<T>&& t){
216 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
221 template <class Executor, class Arg, class... Args>
222 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
223 -> decltype(this->then(std::forward<Arg>(arg),
224 std::forward<Args>(args)...))
226 auto oldX = getExecutor();
228 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
233 Future<Unit> Future<T>::then() {
234 return then([] () {});
237 // onError where the callback returns T
240 typename std::enable_if<
241 !detail::callableWith<F, exception_wrapper>::value &&
242 !detail::Extract<F>::ReturnsFuture::value,
244 Future<T>::onError(F&& func) {
245 typedef typename detail::Extract<F>::FirstArg Exn;
247 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
248 "Return type of onError callback must be T or Future<T>");
251 auto f = p.getFuture();
252 auto pm = folly::makeMoveWrapper(std::move(p));
253 auto funcm = folly::makeMoveWrapper(std::move(func));
254 setCallback_([pm, funcm](Try<T>&& t) mutable {
255 if (!t.template withException<Exn>([&] (Exn& e) {
260 pm->setTry(std::move(t));
267 // onError where the callback returns Future<T>
270 typename std::enable_if<
271 !detail::callableWith<F, exception_wrapper>::value &&
272 detail::Extract<F>::ReturnsFuture::value,
274 Future<T>::onError(F&& func) {
276 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
277 "Return type of onError callback must be T or Future<T>");
278 typedef typename detail::Extract<F>::FirstArg Exn;
281 auto f = p.getFuture();
282 auto pm = folly::makeMoveWrapper(std::move(p));
283 auto funcm = folly::makeMoveWrapper(std::move(func));
284 setCallback_([pm, funcm](Try<T>&& t) mutable {
285 if (!t.template withException<Exn>([&] (Exn& e) {
287 auto f2 = (*funcm)(e);
288 f2.setCallback_([pm](Try<T>&& t2) mutable {
289 pm->setTry(std::move(t2));
291 } catch (const std::exception& e2) {
292 pm->setException(exception_wrapper(std::current_exception(), e2));
294 pm->setException(exception_wrapper(std::current_exception()));
297 pm->setTry(std::move(t));
306 Future<T> Future<T>::ensure(F func) {
307 MoveWrapper<F> funcw(std::move(func));
308 return this->then([funcw](Try<T>&& t) mutable {
310 return makeFuture(std::move(t));
316 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
317 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
318 return within(dur, tk)
319 .onError([funcw](TimedOut const&) { return (*funcw)(); });
324 typename std::enable_if<
325 detail::callableWith<F, exception_wrapper>::value &&
326 detail::Extract<F>::ReturnsFuture::value,
328 Future<T>::onError(F&& func) {
330 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
331 "Return type of onError callback must be T or Future<T>");
334 auto f = p.getFuture();
335 auto pm = folly::makeMoveWrapper(std::move(p));
336 auto funcm = folly::makeMoveWrapper(std::move(func));
337 setCallback_([pm, funcm](Try<T> t) mutable {
338 if (t.hasException()) {
340 auto f2 = (*funcm)(std::move(t.exception()));
341 f2.setCallback_([pm](Try<T> t2) mutable {
342 pm->setTry(std::move(t2));
344 } catch (const std::exception& e2) {
345 pm->setException(exception_wrapper(std::current_exception(), e2));
347 pm->setException(exception_wrapper(std::current_exception()));
350 pm->setTry(std::move(t));
357 // onError(exception_wrapper) that returns T
360 typename std::enable_if<
361 detail::callableWith<F, exception_wrapper>::value &&
362 !detail::Extract<F>::ReturnsFuture::value,
364 Future<T>::onError(F&& func) {
366 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
367 "Return type of onError callback must be T or Future<T>");
370 auto f = p.getFuture();
371 auto pm = folly::makeMoveWrapper(std::move(p));
372 auto funcm = folly::makeMoveWrapper(std::move(func));
373 setCallback_([pm, funcm](Try<T> t) mutable {
374 if (t.hasException()) {
376 return (*funcm)(std::move(t.exception()));
379 pm->setTry(std::move(t));
387 typename std::add_lvalue_reference<T>::type Future<T>::value() {
390 return core_->getTry().value();
394 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
397 return core_->getTry().value();
401 Try<T>& Future<T>::getTry() {
404 return core_->getTry();
408 Optional<Try<T>> Future<T>::poll() {
410 if (core_->ready()) {
411 o = std::move(core_->getTry());
417 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
420 setExecutor(executor, priority);
422 return std::move(*this);
426 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
429 MoveWrapper<Promise<T>> p;
430 auto f = p->getFuture();
431 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
432 return std::move(f).via(executor, priority);
436 template <class Func>
437 auto via(Executor* x, Func func)
438 -> Future<typename isFuture<decltype(func())>::Inner>
440 // TODO make this actually more performant. :-P #7260175
441 return via(x).then(func);
445 bool Future<T>::isReady() const {
447 return core_->ready();
451 bool Future<T>::hasValue() {
452 return getTry().hasValue();
456 bool Future<T>::hasException() {
457 return getTry().hasException();
461 void Future<T>::raise(exception_wrapper exception) {
462 core_->raise(std::move(exception));
468 Future<typename std::decay<T>::type> makeFuture(T&& t) {
469 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
472 inline // for multiple translation units
473 Future<Unit> makeFuture() {
474 return makeFuture(Unit{});
478 auto makeFutureWith(F&& func)
479 -> Future<typename Unit::Lift<decltype(func())>::type> {
480 using LiftedResult = typename Unit::Lift<decltype(func())>::type;
481 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
487 Future<T> makeFuture(std::exception_ptr const& e) {
488 return makeFuture(Try<T>(e));
492 Future<T> makeFuture(exception_wrapper ew) {
493 return makeFuture(Try<T>(std::move(ew)));
496 template <class T, class E>
497 typename std::enable_if<std::is_base_of<std::exception, E>::value,
499 makeFuture(E const& e) {
500 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
504 Future<T> makeFuture(Try<T>&& t) {
505 return Future<T>(new detail::Core<T>(std::move(t)));
509 Future<Unit> via(Executor* executor, int8_t priority) {
510 return makeFuture().via(executor, priority);
513 // mapSetCallback calls func(i, Try<T>) when every future completes
515 template <class T, class InputIterator, class F>
516 void mapSetCallback(InputIterator first, InputIterator last, F func) {
517 for (size_t i = 0; first != last; ++first, ++i) {
518 first->setCallback_([func, i](Try<T>&& t) {
519 func(i, std::move(t));
524 // collectAll (variadic)
526 template <typename... Fs>
527 typename detail::CollectAllVariadicContext<
528 typename std::decay<Fs>::type::value_type...>::type
529 collectAll(Fs&&... fs) {
530 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
531 typename std::decay<Fs>::type::value_type...>>();
532 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
533 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
534 return ctx->p.getFuture();
537 // collectAll (iterator)
539 template <class InputIterator>
542 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
543 collectAll(InputIterator first, InputIterator last) {
545 typename std::iterator_traits<InputIterator>::value_type::value_type T;
547 struct CollectAllContext {
548 CollectAllContext(int n) : results(n) {}
549 ~CollectAllContext() {
550 p.setValue(std::move(results));
552 Promise<std::vector<Try<T>>> p;
553 std::vector<Try<T>> results;
556 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
557 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
558 ctx->results[i] = std::move(t);
560 return ctx->p.getFuture();
563 // collect (iterator)
567 template <typename T>
568 struct CollectContext {
569 struct Nothing { explicit Nothing(int n) {} };
571 using Result = typename std::conditional<
572 std::is_void<T>::value,
574 std::vector<T>>::type;
576 using InternalResult = typename std::conditional<
577 std::is_void<T>::value,
579 std::vector<Optional<T>>>::type;
581 explicit CollectContext(int n) : result(n) {}
583 if (!threw.exchange(true)) {
584 // map Optional<T> -> T
585 std::vector<T> finalResult;
586 finalResult.reserve(result.size());
587 std::transform(result.begin(), result.end(),
588 std::back_inserter(finalResult),
589 [](Optional<T>& o) { return std::move(o.value()); });
590 p.setValue(std::move(finalResult));
593 inline void setPartialResult(size_t i, Try<T>& t) {
594 result[i] = std::move(t.value());
597 InternalResult result;
598 std::atomic<bool> threw {false};
603 template <class InputIterator>
604 Future<typename detail::CollectContext<
605 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
606 collect(InputIterator first, InputIterator last) {
608 typename std::iterator_traits<InputIterator>::value_type::value_type T;
610 auto ctx = std::make_shared<detail::CollectContext<T>>(
611 std::distance(first, last));
612 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
613 if (t.hasException()) {
614 if (!ctx->threw.exchange(true)) {
615 ctx->p.setException(std::move(t.exception()));
617 } else if (!ctx->threw) {
618 ctx->setPartialResult(i, t);
621 return ctx->p.getFuture();
624 // collect (variadic)
626 template <typename... Fs>
627 typename detail::CollectVariadicContext<
628 typename std::decay<Fs>::type::value_type...>::type
629 collect(Fs&&... fs) {
630 auto ctx = std::make_shared<detail::CollectVariadicContext<
631 typename std::decay<Fs>::type::value_type...>>();
632 detail::collectVariadicHelper<detail::CollectVariadicContext>(
633 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
634 return ctx->p.getFuture();
637 // collectAny (iterator)
639 template <class InputIterator>
644 std::iterator_traits<InputIterator>::value_type::value_type>>>
645 collectAny(InputIterator first, InputIterator last) {
647 typename std::iterator_traits<InputIterator>::value_type::value_type T;
649 struct CollectAnyContext {
650 CollectAnyContext() {};
651 Promise<std::pair<size_t, Try<T>>> p;
652 std::atomic<bool> done {false};
655 auto ctx = std::make_shared<CollectAnyContext>();
656 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
657 if (!ctx->done.exchange(true)) {
658 ctx->p.setValue(std::make_pair(i, std::move(t)));
661 return ctx->p.getFuture();
664 // collectN (iterator)
666 template <class InputIterator>
667 Future<std::vector<std::pair<size_t, Try<typename
668 std::iterator_traits<InputIterator>::value_type::value_type>>>>
669 collectN(InputIterator first, InputIterator last, size_t n) {
671 std::iterator_traits<InputIterator>::value_type::value_type T;
672 typedef std::vector<std::pair<size_t, Try<T>>> V;
674 struct CollectNContext {
676 std::atomic<size_t> completed = {0};
679 auto ctx = std::make_shared<CollectNContext>();
681 if (size_t(std::distance(first, last)) < n) {
682 ctx->p.setException(std::runtime_error("Not enough futures"));
684 // for each completed Future, increase count and add to vector, until we
685 // have n completed futures at which point we fulfil our Promise with the
687 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
688 auto c = ++ctx->completed;
690 assert(ctx->v.size() < n);
691 ctx->v.emplace_back(i, std::move(t));
693 ctx->p.setTry(Try<V>(std::move(ctx->v)));
699 return ctx->p.getFuture();
704 template <class It, class T, class F>
705 Future<T> reduce(It first, It last, T&& initial, F&& func) {
707 return makeFuture(std::move(initial));
710 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
711 typedef typename std::conditional<
712 detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
713 typedef isTry<Arg> IsTry;
715 folly::MoveWrapper<T> minitial(std::move(initial));
716 auto sfunc = std::make_shared<F>(std::move(func));
718 auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
719 return (*sfunc)(std::move(*minitial),
720 head.template get<IsTry::value, Arg&&>());
723 for (++first; first != last; ++first) {
724 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
725 return (*sfunc)(std::move(std::get<0>(t).value()),
726 // Either return a ItT&& or a Try<ItT>&& depending
727 // on the type of the argument of func.
728 std::get<1>(t).template get<IsTry::value, Arg&&>());
735 // window (collection)
737 template <class Collection, class F, class ItT, class Result>
738 std::vector<Future<Result>>
739 window(Collection input, F func, size_t n) {
740 struct WindowContext {
741 WindowContext(Collection&& i, F&& fn)
742 : input_(std::move(i)), promises_(input_.size()),
745 std::atomic<size_t> i_ {0};
747 std::vector<Promise<Result>> promises_;
750 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
751 size_t i = ctx->i_++;
752 if (i < ctx->input_.size()) {
753 // Using setCallback_ directly since we don't need the Future
754 ctx->func_(std::move(ctx->input_[i])).setCallback_(
755 // ctx is captured by value
756 [ctx, i](Try<Result>&& t) {
757 ctx->promises_[i].setTry(std::move(t));
758 // Chain another future onto this one
759 spawn(std::move(ctx));
765 auto max = std::min(n, input.size());
767 auto ctx = std::make_shared<WindowContext>(
768 std::move(input), std::move(func));
770 for (size_t i = 0; i < max; ++i) {
771 // Start the first n Futures
772 WindowContext::spawn(ctx);
775 std::vector<Future<Result>> futures;
776 futures.reserve(ctx->promises_.size());
777 for (auto& promise : ctx->promises_) {
778 futures.emplace_back(promise.getFuture());
787 template <class I, class F>
788 Future<I> Future<T>::reduce(I&& initial, F&& func) {
789 folly::MoveWrapper<I> minitial(std::move(initial));
790 folly::MoveWrapper<F> mfunc(std::move(func));
791 return then([minitial, mfunc](T& vals) mutable {
792 auto ret = std::move(*minitial);
793 for (auto& val : vals) {
794 ret = (*mfunc)(std::move(ret), std::move(val));
800 // unorderedReduce (iterator)
802 template <class It, class T, class F, class ItT, class Arg>
803 Future<T> unorderedReduce(It first, It last, T initial, F func) {
805 return makeFuture(std::move(initial));
808 typedef isTry<Arg> IsTry;
810 struct UnorderedReduceContext {
811 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
812 : lock_(), memo_(makeFuture<T>(std::move(memo))),
813 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
815 folly::MicroSpinLock lock_; // protects memo_ and numThens_
818 size_t numThens_; // how many Futures completed and called .then()
819 size_t numFutures_; // how many Futures in total
823 auto ctx = std::make_shared<UnorderedReduceContext>(
824 std::move(initial), std::move(func), std::distance(first, last));
826 mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
827 folly::MoveWrapper<Try<ItT>> mt(std::move(t));
828 // Futures can be completed in any order, simultaneously.
829 // To make this non-blocking, we create a new Future chain in
830 // the order of completion to reduce the values.
831 // The spinlock just protects chaining a new Future, not actually
832 // executing the reduce, which should be really fast.
833 folly::MSLGuard lock(ctx->lock_);
834 ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
835 // Either return a ItT&& or a Try<ItT>&& depending
836 // on the type of the argument of func.
837 return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
839 if (++ctx->numThens_ == ctx->numFutures_) {
840 // After reducing the value of the last Future, fulfill the Promise
841 ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
842 ctx->promise_.setValue(std::move(t2));
847 return ctx->promise_.getFuture();
853 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
854 return within(dur, TimedOut(), tk);
859 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
862 Context(E ex) : exception(std::move(ex)), promise() {}
864 Future<Unit> thisFuture;
866 std::atomic<bool> token {false};
870 tk = folly::detail::getTimekeeperSingleton();
873 auto ctx = std::make_shared<Context>(std::move(e));
875 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
876 // TODO: "this" completed first, cancel "after"
877 if (ctx->token.exchange(true) == false) {
878 ctx->promise.setTry(std::move(t));
882 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
883 // "after" completed first, cancel "this"
884 ctx->thisFuture.raise(TimedOut());
885 if (ctx->token.exchange(true) == false) {
886 if (t.hasException()) {
887 ctx->promise.setException(std::move(t.exception()));
889 ctx->promise.setException(std::move(ctx->exception));
894 return ctx->promise.getFuture().via(getExecutor());
900 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
901 return collectAll(*this, futures::sleep(dur, tk))
902 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
903 Try<T>& t = std::get<0>(tup);
904 return makeFuture<T>(std::move(t));
911 void waitImpl(Future<T>& f) {
912 // short-circuit if there's nothing to do
913 if (f.isReady()) return;
915 folly::fibers::Baton baton;
916 f = f.then([&](Try<T> t) {
918 return makeFuture(std::move(t));
922 // There's a race here between the return here and the actual finishing of
923 // the future. f is completed, but the setup may not have finished on done
924 // after the baton has posted.
925 while (!f.isReady()) {
926 std::this_thread::yield();
931 void waitImpl(Future<T>& f, Duration dur) {
932 // short-circuit if there's nothing to do
933 if (f.isReady()) return;
935 auto baton = std::make_shared<folly::fibers::Baton>();
936 f = f.then([baton](Try<T> t) {
938 return makeFuture(std::move(t));
941 // Let's preserve the invariant that if we did not timeout (timed_wait returns
942 // true), then the returned Future is complete when it is returned to the
943 // caller. We need to wait out the race for that Future to complete.
944 if (baton->timed_wait(dur)) {
945 while (!f.isReady()) {
946 std::this_thread::yield();
952 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
953 while (!f.isReady()) {
961 Future<T>& Future<T>::wait() & {
962 detail::waitImpl(*this);
967 Future<T>&& Future<T>::wait() && {
968 detail::waitImpl(*this);
969 return std::move(*this);
973 Future<T>& Future<T>::wait(Duration dur) & {
974 detail::waitImpl(*this, dur);
979 Future<T>&& Future<T>::wait(Duration dur) && {
980 detail::waitImpl(*this, dur);
981 return std::move(*this);
985 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
986 detail::waitViaImpl(*this, e);
991 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
992 detail::waitViaImpl(*this, e);
993 return std::move(*this);
998 return std::move(wait().value());
1002 T Future<T>::get(Duration dur) {
1005 return std::move(value());
1012 T Future<T>::getVia(DrivableExecutor* e) {
1013 return std::move(waitVia(e).value());
1019 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1020 return t1.value() == t2.value();
1026 Future<bool> Future<T>::willEqual(Future<T>& f) {
1027 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1028 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1029 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1038 Future<T> Future<T>::filter(F predicate) {
1039 auto p = folly::makeMoveWrapper(std::move(predicate));
1040 return this->then([p](T val) {
1041 T const& valConstRef = val;
1042 if (!(*p)(valConstRef)) {
1043 throw PredicateDoesNotObtain();
1050 template <class Callback>
1051 auto Future<T>::thenMulti(Callback&& fn)
1052 -> decltype(this->then(std::forward<Callback>(fn))) {
1053 // thenMulti with one callback is just a then
1054 return then(std::forward<Callback>(fn));
1058 template <class Callback, class... Callbacks>
1059 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1060 -> decltype(this->then(std::forward<Callback>(fn)).
1061 thenMulti(std::forward<Callbacks>(fns)...)) {
1062 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1063 return then(std::forward<Callback>(fn)).
1064 thenMulti(std::forward<Callbacks>(fns)...);
1068 template <class Callback, class... Callbacks>
1069 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1071 -> decltype(this->then(std::forward<Callback>(fn)).
1072 thenMulti(std::forward<Callbacks>(fns)...)) {
1073 // thenMultiExecutor with two callbacks is
1074 // via(x).then(a).thenMulti(b, ...).via(oldX)
1075 auto oldX = getExecutor();
1077 return then(std::forward<Callback>(fn)).
1078 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1082 template <class Callback>
1083 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1084 -> decltype(this->then(std::forward<Callback>(fn))) {
1085 // thenMulti with one callback is just a then with an executor
1086 return then(x, std::forward<Callback>(fn));
1090 template <class It, class F, class ItT, class Result>
1091 std::vector<Future<Result>> map(It first, It last, F func) {
1092 std::vector<Future<Result>> results;
1093 for (auto it = first; it != last; it++) {
1094 results.push_back(it->then(func));
1104 struct retrying_policy_raw_tag {};
1105 struct retrying_policy_fut_tag {};
1107 template <class Policy>
1108 struct retrying_policy_traits {
1109 using ew = exception_wrapper;
1110 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1111 template <class Ret>
1112 using has_op = typename std::integral_constant<bool,
1113 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1114 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1115 using is_raw = has_op<bool>;
1116 using is_fut = has_op<Future<bool>>;
1117 using tag = typename std::conditional<
1118 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1119 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1122 template <class Policy, class FF>
1123 typename std::result_of<FF(size_t)>::type
1124 retrying(size_t k, Policy&& p, FF&& ff) {
1125 using F = typename std::result_of<FF(size_t)>::type;
1126 using T = typename F::value_type;
1128 auto pm = makeMoveWrapper(p);
1129 auto ffm = makeMoveWrapper(ff);
1130 return f.onError([=](exception_wrapper x) mutable {
1131 auto q = (*pm)(k, x);
1132 auto xm = makeMoveWrapper(std::move(x));
1133 return q.then([=](bool r) mutable {
1135 ? retrying(k, pm.move(), ffm.move())
1136 : makeFuture<T>(xm.move());
1141 template <class Policy, class FF>
1142 typename std::result_of<FF(size_t)>::type
1143 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1144 auto pm = makeMoveWrapper(std::move(p));
1145 auto q = [=](size_t k, exception_wrapper x) {
1146 return makeFuture<bool>((*pm)(k, x));
1148 return retrying(0, std::move(q), std::forward<FF>(ff));
1151 template <class Policy, class FF>
1152 typename std::result_of<FF(size_t)>::type
1153 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1154 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1157 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1158 template <class URNG>
1159 Duration retryingJitteredExponentialBackoffDur(
1161 Duration backoff_min,
1162 Duration backoff_max,
1163 double jitter_param,
1166 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1167 auto jitter = std::exp(dist(rng));
1168 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1169 return std::max(backoff_min, std::min(backoff_max, backoff));
1172 template <class Policy, class URNG>
1173 std::function<Future<bool>(size_t, const exception_wrapper&)>
1174 retryingPolicyCappedJitteredExponentialBackoff(
1176 Duration backoff_min,
1177 Duration backoff_max,
1178 double jitter_param,
1181 auto pm = makeMoveWrapper(std::move(p));
1182 auto rngp = std::make_shared<URNG>(std::move(rng));
1183 return [=](size_t n, const exception_wrapper& ex) mutable {
1184 if (n == max_tries) { return makeFuture(false); }
1185 return (*pm)(n, ex).then([=](bool v) {
1186 if (!v) { return makeFuture(false); }
1187 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1188 n, backoff_min, backoff_max, jitter_param, *rngp);
1189 return futures::sleep(backoff).then([] { return true; });
1194 template <class Policy, class URNG>
1195 std::function<Future<bool>(size_t, const exception_wrapper&)>
1196 retryingPolicyCappedJitteredExponentialBackoff(
1198 Duration backoff_min,
1199 Duration backoff_max,
1200 double jitter_param,
1203 retrying_policy_raw_tag) {
1204 auto pm = makeMoveWrapper(std::move(p));
1205 auto q = [=](size_t n, const exception_wrapper& e) {
1206 return makeFuture((*pm)(n, e));
1208 return retryingPolicyCappedJitteredExponentialBackoff(
1217 template <class Policy, class URNG>
1218 std::function<Future<bool>(size_t, const exception_wrapper&)>
1219 retryingPolicyCappedJitteredExponentialBackoff(
1221 Duration backoff_min,
1222 Duration backoff_max,
1223 double jitter_param,
1226 retrying_policy_fut_tag) {
1227 return retryingPolicyCappedJitteredExponentialBackoff(
1238 template <class Policy, class FF>
1239 typename std::result_of<FF(size_t)>::type
1240 retrying(Policy&& p, FF&& ff) {
1241 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1242 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1246 std::function<bool(size_t, const exception_wrapper&)>
1247 retryingPolicyBasic(
1249 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1252 template <class Policy, class URNG>
1253 std::function<Future<bool>(size_t, const exception_wrapper&)>
1254 retryingPolicyCappedJitteredExponentialBackoff(
1256 Duration backoff_min,
1257 Duration backoff_max,
1258 double jitter_param,
1261 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1262 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1273 std::function<Future<bool>(size_t, const exception_wrapper&)>
1274 retryingPolicyCappedJitteredExponentialBackoff(
1276 Duration backoff_min,
1277 Duration backoff_max,
1278 double jitter_param) {
1279 auto p = [](size_t, const exception_wrapper&) { return true; };
1280 return retryingPolicyCappedJitteredExponentialBackoff(
1291 // Instantiate the most common Future types to save compile time
1292 extern template class Future<Unit>;
1293 extern template class Future<bool>;
1294 extern template class Future<int>;
1295 extern template class Future<int64_t>;
1296 extern template class Future<std::string>;
1297 extern template class Future<double>;
1299 } // namespace folly