2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if defined(__ANDROID__) || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/experimental/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <class T2, typename>
67 Future<T>::Future(T2&& val)
68 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
71 template <typename, typename>
73 : core_(new detail::Core<T>(Try<T>(T()))) {}
76 Future<T>::~Future() {
81 void Future<T>::detach() {
83 core_->detachFuture();
89 void Future<T>::throwIfInvalid() const {
96 void Future<T>::setCallback_(F&& func) {
98 core_->setCallback(std::move(func));
105 typename std::enable_if<isFuture<F>::value,
106 Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108 return then([](Future<typename isFuture<T>::Inner> internal_future) {
109 return internal_future;
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
121 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122 typedef typename R::ReturnsFuture::Inner B;
126 // wrap these so we can move them into the lambda
127 folly::MoveWrapper<Promise<B>> p;
128 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
129 folly::MoveWrapper<F> funcm(std::forward<F>(func));
131 // grab the Future now before we lose our handle on the Promise
132 auto f = p->getFuture();
133 f.core_->setExecutorNoLock(getExecutor());
135 /* This is a bit tricky.
137 We can't just close over *this in case this Future gets moved. So we
138 make a new dummy Future. We could figure out something more
139 sophisticated that avoids making a new Future object when it can, as an
140 optimization. But this is correct.
142 core_ can't be moved, it is explicitly disallowed (as is copying). But
143 if there's ever a reason to allow it, this is one place that makes that
144 assumption and would need to be fixed. We use a standard shared pointer
145 for core_ (by copying it in), which means in essence obj holds a shared
146 pointer to itself. But this shouldn't leak because Promise will not
147 outlive the continuation, because Promise will setException() with a
148 broken Promise if it is destructed before completed. We could use a
149 weak pointer but it would have to be converted to a shared pointer when
150 func is executed (because the Future returned by func may possibly
151 persist beyond the callback, if it gets moved), and so it is an
152 optimization to just make it shared from the get-go.
154 We have to move in the Promise and func using the MoveWrapper
155 hack. (func could be copied but it's a big drag on perf).
157 Two subtle but important points about this design. detail::Core has no
158 back pointers to Future or Promise, so if Future or Promise get moved
159 (and they will be moved in performant code) we don't have to do
160 anything fancy. And because we store the continuation in the
161 detail::Core, not in the Future, we can execute the continuation even
162 after the Future has gone out of scope. This is an intentional design
163 decision. It is likely we will want to be able to cancel a continuation
164 in some circumstances, but I think it should be explicit not implicit
165 in the destruction of the Future used to create it.
168 [p, funcm](Try<T>&& t) mutable {
169 if (!isTry && t.hasException()) {
170 p->setException(std::move(t.exception()));
173 return (*funcm)(t.template get<isTry, Args>()...);
181 // Variant: returns a Future
182 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
184 template <typename F, typename R, bool isTry, typename... Args>
185 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
186 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
187 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
188 typedef typename R::ReturnsFuture::Inner B;
192 // wrap these so we can move them into the lambda
193 folly::MoveWrapper<Promise<B>> p;
194 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
195 folly::MoveWrapper<F> funcm(std::forward<F>(func));
197 // grab the Future now before we lose our handle on the Promise
198 auto f = p->getFuture();
199 f.core_->setExecutorNoLock(getExecutor());
202 [p, funcm](Try<T>&& t) mutable {
203 if (!isTry && t.hasException()) {
204 p->setException(std::move(t.exception()));
207 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
208 // that didn't throw, now we can steal p
209 f2.setCallback_([p](Try<B>&& b) mutable {
210 p->setTry(std::move(b));
212 } catch (const std::exception& e) {
213 p->setException(exception_wrapper(std::current_exception(), e));
215 p->setException(exception_wrapper(std::current_exception()));
223 template <typename T>
224 template <typename R, typename Caller, typename... Args>
225 Future<typename isFuture<R>::Inner>
226 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
227 typedef typename std::remove_cv<
228 typename std::remove_reference<
229 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
230 return then([instance, func](Try<T>&& t){
231 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
236 template <class Executor, class Arg, class... Args>
237 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
238 -> decltype(this->then(std::forward<Arg>(arg),
239 std::forward<Args>(args)...))
241 auto oldX = getExecutor();
243 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
248 Future<Unit> Future<T>::then() {
249 return then([] () {});
252 // onError where the callback returns T
255 typename std::enable_if<
256 !detail::callableWith<F, exception_wrapper>::value &&
257 !detail::Extract<F>::ReturnsFuture::value,
259 Future<T>::onError(F&& func) {
260 typedef typename detail::Extract<F>::FirstArg Exn;
262 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
263 "Return type of onError callback must be T or Future<T>");
266 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
267 auto f = p.getFuture();
268 auto pm = folly::makeMoveWrapper(std::move(p));
269 auto funcm = folly::makeMoveWrapper(std::move(func));
270 setCallback_([pm, funcm](Try<T>&& t) mutable {
271 if (!t.template withException<Exn>([&] (Exn& e) {
276 pm->setTry(std::move(t));
283 // onError where the callback returns Future<T>
286 typename std::enable_if<
287 !detail::callableWith<F, exception_wrapper>::value &&
288 detail::Extract<F>::ReturnsFuture::value,
290 Future<T>::onError(F&& func) {
292 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
293 "Return type of onError callback must be T or Future<T>");
294 typedef typename detail::Extract<F>::FirstArg Exn;
297 auto f = p.getFuture();
298 auto pm = folly::makeMoveWrapper(std::move(p));
299 auto funcm = folly::makeMoveWrapper(std::move(func));
300 setCallback_([pm, funcm](Try<T>&& t) mutable {
301 if (!t.template withException<Exn>([&] (Exn& e) {
303 auto f2 = (*funcm)(e);
304 f2.setCallback_([pm](Try<T>&& t2) mutable {
305 pm->setTry(std::move(t2));
307 } catch (const std::exception& e2) {
308 pm->setException(exception_wrapper(std::current_exception(), e2));
310 pm->setException(exception_wrapper(std::current_exception()));
313 pm->setTry(std::move(t));
322 Future<T> Future<T>::ensure(F func) {
323 MoveWrapper<F> funcw(std::move(func));
324 return this->then([funcw](Try<T>&& t) mutable {
326 return makeFuture(std::move(t));
332 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
333 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
334 return within(dur, tk)
335 .onError([funcw](TimedOut const&) { return (*funcw)(); });
340 typename std::enable_if<
341 detail::callableWith<F, exception_wrapper>::value &&
342 detail::Extract<F>::ReturnsFuture::value,
344 Future<T>::onError(F&& func) {
346 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
347 "Return type of onError callback must be T or Future<T>");
350 auto f = p.getFuture();
351 auto pm = folly::makeMoveWrapper(std::move(p));
352 auto funcm = folly::makeMoveWrapper(std::move(func));
353 setCallback_([pm, funcm](Try<T> t) mutable {
354 if (t.hasException()) {
356 auto f2 = (*funcm)(std::move(t.exception()));
357 f2.setCallback_([pm](Try<T> t2) mutable {
358 pm->setTry(std::move(t2));
360 } catch (const std::exception& e2) {
361 pm->setException(exception_wrapper(std::current_exception(), e2));
363 pm->setException(exception_wrapper(std::current_exception()));
366 pm->setTry(std::move(t));
373 // onError(exception_wrapper) that returns T
376 typename std::enable_if<
377 detail::callableWith<F, exception_wrapper>::value &&
378 !detail::Extract<F>::ReturnsFuture::value,
380 Future<T>::onError(F&& func) {
382 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
383 "Return type of onError callback must be T or Future<T>");
386 auto f = p.getFuture();
387 auto pm = folly::makeMoveWrapper(std::move(p));
388 auto funcm = folly::makeMoveWrapper(std::move(func));
389 setCallback_([pm, funcm](Try<T> t) mutable {
390 if (t.hasException()) {
392 return (*funcm)(std::move(t.exception()));
395 pm->setTry(std::move(t));
403 typename std::add_lvalue_reference<T>::type Future<T>::value() {
406 return core_->getTry().value();
410 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
413 return core_->getTry().value();
417 Try<T>& Future<T>::getTry() {
420 return core_->getTry();
424 Optional<Try<T>> Future<T>::poll() {
426 if (core_->ready()) {
427 o = std::move(core_->getTry());
433 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
436 setExecutor(executor, priority);
438 return std::move(*this);
442 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
445 MoveWrapper<Promise<T>> p;
446 auto f = p->getFuture();
447 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
448 return std::move(f).via(executor, priority);
452 template <class Func>
453 auto via(Executor* x, Func func)
454 -> Future<typename isFuture<decltype(func())>::Inner>
456 // TODO make this actually more performant. :-P #7260175
457 return via(x).then(func);
461 bool Future<T>::isReady() const {
463 return core_->ready();
467 bool Future<T>::hasValue() {
468 return getTry().hasValue();
472 bool Future<T>::hasException() {
473 return getTry().hasException();
477 void Future<T>::raise(exception_wrapper exception) {
478 core_->raise(std::move(exception));
484 Future<typename std::decay<T>::type> makeFuture(T&& t) {
485 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
488 inline // for multiple translation units
489 Future<Unit> makeFuture() {
490 return makeFuture(Unit{});
493 // makeFutureWith(Future<T>()) -> Future<T>
495 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
496 typename std::result_of<F()>::type>::type
497 makeFutureWith(F&& func) {
499 typename isFuture<typename std::result_of<F()>::type>::Inner;
502 } catch (std::exception& e) {
503 return makeFuture<InnerType>(
504 exception_wrapper(std::current_exception(), e));
506 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
510 // makeFutureWith(T()) -> Future<T>
511 // makeFutureWith(void()) -> Future<Unit>
513 typename std::enable_if<
514 !(isFuture<typename std::result_of<F()>::type>::value),
515 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
516 makeFutureWith(F&& func) {
518 typename Unit::Lift<typename std::result_of<F()>::type>::type;
519 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
525 Future<T> makeFuture(std::exception_ptr const& e) {
526 return makeFuture(Try<T>(e));
530 Future<T> makeFuture(exception_wrapper ew) {
531 return makeFuture(Try<T>(std::move(ew)));
534 template <class T, class E>
535 typename std::enable_if<std::is_base_of<std::exception, E>::value,
537 makeFuture(E const& e) {
538 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
542 Future<T> makeFuture(Try<T>&& t) {
543 return Future<T>(new detail::Core<T>(std::move(t)));
547 Future<Unit> via(Executor* executor, int8_t priority) {
548 return makeFuture().via(executor, priority);
551 // mapSetCallback calls func(i, Try<T>) when every future completes
553 template <class T, class InputIterator, class F>
554 void mapSetCallback(InputIterator first, InputIterator last, F func) {
555 for (size_t i = 0; first != last; ++first, ++i) {
556 first->setCallback_([func, i](Try<T>&& t) {
557 func(i, std::move(t));
562 // collectAll (variadic)
564 template <typename... Fs>
565 typename detail::CollectAllVariadicContext<
566 typename std::decay<Fs>::type::value_type...>::type
567 collectAll(Fs&&... fs) {
568 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
569 typename std::decay<Fs>::type::value_type...>>();
570 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
571 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
572 return ctx->p.getFuture();
575 // collectAll (iterator)
577 template <class InputIterator>
580 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
581 collectAll(InputIterator first, InputIterator last) {
583 typename std::iterator_traits<InputIterator>::value_type::value_type T;
585 struct CollectAllContext {
586 CollectAllContext(int n) : results(n) {}
587 ~CollectAllContext() {
588 p.setValue(std::move(results));
590 Promise<std::vector<Try<T>>> p;
591 std::vector<Try<T>> results;
594 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
595 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
596 ctx->results[i] = std::move(t);
598 return ctx->p.getFuture();
601 // collect (iterator)
605 template <typename T>
606 struct CollectContext {
608 explicit Nothing(int /* n */) {}
611 using Result = typename std::conditional<
612 std::is_void<T>::value,
614 std::vector<T>>::type;
616 using InternalResult = typename std::conditional<
617 std::is_void<T>::value,
619 std::vector<Optional<T>>>::type;
621 explicit CollectContext(int n) : result(n) {}
623 if (!threw.exchange(true)) {
624 // map Optional<T> -> T
625 std::vector<T> finalResult;
626 finalResult.reserve(result.size());
627 std::transform(result.begin(), result.end(),
628 std::back_inserter(finalResult),
629 [](Optional<T>& o) { return std::move(o.value()); });
630 p.setValue(std::move(finalResult));
633 inline void setPartialResult(size_t i, Try<T>& t) {
634 result[i] = std::move(t.value());
637 InternalResult result;
638 std::atomic<bool> threw {false};
643 template <class InputIterator>
644 Future<typename detail::CollectContext<
645 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
646 collect(InputIterator first, InputIterator last) {
648 typename std::iterator_traits<InputIterator>::value_type::value_type T;
650 auto ctx = std::make_shared<detail::CollectContext<T>>(
651 std::distance(first, last));
652 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
653 if (t.hasException()) {
654 if (!ctx->threw.exchange(true)) {
655 ctx->p.setException(std::move(t.exception()));
657 } else if (!ctx->threw) {
658 ctx->setPartialResult(i, t);
661 return ctx->p.getFuture();
664 // collect (variadic)
666 template <typename... Fs>
667 typename detail::CollectVariadicContext<
668 typename std::decay<Fs>::type::value_type...>::type
669 collect(Fs&&... fs) {
670 auto ctx = std::make_shared<detail::CollectVariadicContext<
671 typename std::decay<Fs>::type::value_type...>>();
672 detail::collectVariadicHelper<detail::CollectVariadicContext>(
673 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
674 return ctx->p.getFuture();
677 // collectAny (iterator)
679 template <class InputIterator>
684 std::iterator_traits<InputIterator>::value_type::value_type>>>
685 collectAny(InputIterator first, InputIterator last) {
687 typename std::iterator_traits<InputIterator>::value_type::value_type T;
689 struct CollectAnyContext {
690 CollectAnyContext() {};
691 Promise<std::pair<size_t, Try<T>>> p;
692 std::atomic<bool> done {false};
695 auto ctx = std::make_shared<CollectAnyContext>();
696 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
697 if (!ctx->done.exchange(true)) {
698 ctx->p.setValue(std::make_pair(i, std::move(t)));
701 return ctx->p.getFuture();
704 // collectN (iterator)
706 template <class InputIterator>
707 Future<std::vector<std::pair<size_t, Try<typename
708 std::iterator_traits<InputIterator>::value_type::value_type>>>>
709 collectN(InputIterator first, InputIterator last, size_t n) {
711 std::iterator_traits<InputIterator>::value_type::value_type T;
712 typedef std::vector<std::pair<size_t, Try<T>>> V;
714 struct CollectNContext {
716 std::atomic<size_t> completed = {0};
719 auto ctx = std::make_shared<CollectNContext>();
721 if (size_t(std::distance(first, last)) < n) {
722 ctx->p.setException(std::runtime_error("Not enough futures"));
724 // for each completed Future, increase count and add to vector, until we
725 // have n completed futures at which point we fulfil our Promise with the
727 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
728 auto c = ++ctx->completed;
730 assert(ctx->v.size() < n);
731 ctx->v.emplace_back(i, std::move(t));
733 ctx->p.setTry(Try<V>(std::move(ctx->v)));
739 return ctx->p.getFuture();
744 template <class It, class T, class F>
745 Future<T> reduce(It first, It last, T&& initial, F&& func) {
747 return makeFuture(std::move(initial));
750 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
751 typedef typename std::conditional<
752 detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
753 typedef isTry<Arg> IsTry;
755 folly::MoveWrapper<T> minitial(std::move(initial));
756 auto sfunc = std::make_shared<F>(std::move(func));
758 auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
759 return (*sfunc)(std::move(*minitial),
760 head.template get<IsTry::value, Arg&&>());
763 for (++first; first != last; ++first) {
764 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
765 return (*sfunc)(std::move(std::get<0>(t).value()),
766 // Either return a ItT&& or a Try<ItT>&& depending
767 // on the type of the argument of func.
768 std::get<1>(t).template get<IsTry::value, Arg&&>());
775 // window (collection)
777 template <class Collection, class F, class ItT, class Result>
778 std::vector<Future<Result>>
779 window(Collection input, F func, size_t n) {
780 struct WindowContext {
781 WindowContext(Collection&& i, F&& fn)
782 : input_(std::move(i)), promises_(input_.size()),
785 std::atomic<size_t> i_ {0};
787 std::vector<Promise<Result>> promises_;
790 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
791 size_t i = ctx->i_++;
792 if (i < ctx->input_.size()) {
793 // Using setCallback_ directly since we don't need the Future
794 ctx->func_(std::move(ctx->input_[i])).setCallback_(
795 // ctx is captured by value
796 [ctx, i](Try<Result>&& t) {
797 ctx->promises_[i].setTry(std::move(t));
798 // Chain another future onto this one
799 spawn(std::move(ctx));
805 auto max = std::min(n, input.size());
807 auto ctx = std::make_shared<WindowContext>(
808 std::move(input), std::move(func));
810 for (size_t i = 0; i < max; ++i) {
811 // Start the first n Futures
812 WindowContext::spawn(ctx);
815 std::vector<Future<Result>> futures;
816 futures.reserve(ctx->promises_.size());
817 for (auto& promise : ctx->promises_) {
818 futures.emplace_back(promise.getFuture());
827 template <class I, class F>
828 Future<I> Future<T>::reduce(I&& initial, F&& func) {
829 folly::MoveWrapper<I> minitial(std::move(initial));
830 folly::MoveWrapper<F> mfunc(std::move(func));
831 return then([minitial, mfunc](T& vals) mutable {
832 auto ret = std::move(*minitial);
833 for (auto& val : vals) {
834 ret = (*mfunc)(std::move(ret), std::move(val));
840 // unorderedReduce (iterator)
842 template <class It, class T, class F, class ItT, class Arg>
843 Future<T> unorderedReduce(It first, It last, T initial, F func) {
845 return makeFuture(std::move(initial));
848 typedef isTry<Arg> IsTry;
850 struct UnorderedReduceContext {
851 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
852 : lock_(), memo_(makeFuture<T>(std::move(memo))),
853 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
855 folly::MicroSpinLock lock_; // protects memo_ and numThens_
858 size_t numThens_; // how many Futures completed and called .then()
859 size_t numFutures_; // how many Futures in total
863 auto ctx = std::make_shared<UnorderedReduceContext>(
864 std::move(initial), std::move(func), std::distance(first, last));
869 [ctx](size_t /* i */, Try<ItT>&& t) {
870 folly::MoveWrapper<Try<ItT>> mt(std::move(t));
871 // Futures can be completed in any order, simultaneously.
872 // To make this non-blocking, we create a new Future chain in
873 // the order of completion to reduce the values.
874 // The spinlock just protects chaining a new Future, not actually
875 // executing the reduce, which should be really fast.
876 folly::MSLGuard lock(ctx->lock_);
877 ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
878 // Either return a ItT&& or a Try<ItT>&& depending
879 // on the type of the argument of func.
880 return ctx->func_(std::move(v),
881 mt->template get<IsTry::value, Arg&&>());
883 if (++ctx->numThens_ == ctx->numFutures_) {
884 // After reducing the value of the last Future, fulfill the Promise
885 ctx->memo_.setCallback_(
886 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
890 return ctx->promise_.getFuture();
896 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
897 return within(dur, TimedOut(), tk);
902 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
905 Context(E ex) : exception(std::move(ex)), promise() {}
907 Future<Unit> thisFuture;
909 std::atomic<bool> token {false};
912 std::shared_ptr<Timekeeper> tks;
914 tks = folly::detail::getTimekeeperSingleton();
915 tk = DCHECK_NOTNULL(tks.get());
918 auto ctx = std::make_shared<Context>(std::move(e));
920 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
921 // TODO: "this" completed first, cancel "after"
922 if (ctx->token.exchange(true) == false) {
923 ctx->promise.setTry(std::move(t));
927 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
928 // "after" completed first, cancel "this"
929 ctx->thisFuture.raise(TimedOut());
930 if (ctx->token.exchange(true) == false) {
931 if (t.hasException()) {
932 ctx->promise.setException(std::move(t.exception()));
934 ctx->promise.setException(std::move(ctx->exception));
939 return ctx->promise.getFuture().via(getExecutor());
945 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
946 return collectAll(*this, futures::sleep(dur, tk))
947 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
948 Try<T>& t = std::get<0>(tup);
949 return makeFuture<T>(std::move(t));
956 void waitImpl(Future<T>& f) {
957 // short-circuit if there's nothing to do
958 if (f.isReady()) return;
960 FutureBatonType baton;
961 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
967 void waitImpl(Future<T>& f, Duration dur) {
968 // short-circuit if there's nothing to do
969 if (f.isReady()) return;
971 folly::MoveWrapper<Promise<T>> promise;
972 auto ret = promise->getFuture();
973 auto baton = std::make_shared<FutureBatonType>();
974 f.setCallback_([baton, promise](Try<T>&& t) mutable {
975 promise->setTry(std::move(t));
979 if (baton->timed_wait(dur)) {
985 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
986 // Set callback so to ensure that the via executor has something on it
987 // so that once the preceding future triggers this callback, drive will
988 // always have a callback to satisfy it
991 f = f.then([](T&& t) { return std::move(t); });
992 while (!f.isReady()) {
1001 Future<T>& Future<T>::wait() & {
1002 detail::waitImpl(*this);
1007 Future<T>&& Future<T>::wait() && {
1008 detail::waitImpl(*this);
1009 return std::move(*this);
1013 Future<T>& Future<T>::wait(Duration dur) & {
1014 detail::waitImpl(*this, dur);
1019 Future<T>&& Future<T>::wait(Duration dur) && {
1020 detail::waitImpl(*this, dur);
1021 return std::move(*this);
1025 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1026 detail::waitViaImpl(*this, e);
1031 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1032 detail::waitViaImpl(*this, e);
1033 return std::move(*this);
1037 T Future<T>::get() {
1038 return std::move(wait().value());
1042 T Future<T>::get(Duration dur) {
1045 return std::move(value());
1052 T Future<T>::getVia(DrivableExecutor* e) {
1053 return std::move(waitVia(e).value());
1059 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1060 return t1.value() == t2.value();
1066 Future<bool> Future<T>::willEqual(Future<T>& f) {
1067 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1068 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1069 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1078 Future<T> Future<T>::filter(F predicate) {
1079 auto p = folly::makeMoveWrapper(std::move(predicate));
1080 return this->then([p](T val) {
1081 T const& valConstRef = val;
1082 if (!(*p)(valConstRef)) {
1083 throw PredicateDoesNotObtain();
1090 template <class Callback>
1091 auto Future<T>::thenMulti(Callback&& fn)
1092 -> decltype(this->then(std::forward<Callback>(fn))) {
1093 // thenMulti with one callback is just a then
1094 return then(std::forward<Callback>(fn));
1098 template <class Callback, class... Callbacks>
1099 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1100 -> decltype(this->then(std::forward<Callback>(fn)).
1101 thenMulti(std::forward<Callbacks>(fns)...)) {
1102 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1103 return then(std::forward<Callback>(fn)).
1104 thenMulti(std::forward<Callbacks>(fns)...);
1108 template <class Callback, class... Callbacks>
1109 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1111 -> decltype(this->then(std::forward<Callback>(fn)).
1112 thenMulti(std::forward<Callbacks>(fns)...)) {
1113 // thenMultiExecutor with two callbacks is
1114 // via(x).then(a).thenMulti(b, ...).via(oldX)
1115 auto oldX = getExecutor();
1117 return then(std::forward<Callback>(fn)).
1118 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1122 template <class Callback>
1123 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1124 -> decltype(this->then(std::forward<Callback>(fn))) {
1125 // thenMulti with one callback is just a then with an executor
1126 return then(x, std::forward<Callback>(fn));
1130 inline Future<Unit> when(bool p, F thunk) {
1131 return p ? thunk().unit() : makeFuture();
1134 template <class P, class F>
1135 Future<Unit> whileDo(P predicate, F thunk) {
1137 return thunk().then([=] {
1138 return whileDo(predicate, thunk);
1141 return makeFuture();
1145 Future<Unit> times(const int n, F thunk) {
1146 auto count = folly::makeMoveWrapper(
1147 std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1149 return folly::whileDo([=]() mutable {
1150 return (*count)->fetch_add(1) < n;
1155 template <class It, class F, class ItT, class Result>
1156 std::vector<Future<Result>> map(It first, It last, F func) {
1157 std::vector<Future<Result>> results;
1158 for (auto it = first; it != last; it++) {
1159 results.push_back(it->then(func));
1169 struct retrying_policy_raw_tag {};
1170 struct retrying_policy_fut_tag {};
1172 template <class Policy>
1173 struct retrying_policy_traits {
1174 using ew = exception_wrapper;
1175 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1176 template <class Ret>
1177 using has_op = typename std::integral_constant<bool,
1178 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1179 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1180 using is_raw = has_op<bool>;
1181 using is_fut = has_op<Future<bool>>;
1182 using tag = typename std::conditional<
1183 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1184 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1187 template <class Policy, class FF>
1188 typename std::result_of<FF(size_t)>::type
1189 retrying(size_t k, Policy&& p, FF&& ff) {
1190 using F = typename std::result_of<FF(size_t)>::type;
1191 using T = typename F::value_type;
1193 auto pm = makeMoveWrapper(p);
1194 auto ffm = makeMoveWrapper(ff);
1195 return f.onError([=](exception_wrapper x) mutable {
1196 auto q = (*pm)(k, x);
1197 auto xm = makeMoveWrapper(std::move(x));
1198 return q.then([=](bool r) mutable {
1200 ? retrying(k, pm.move(), ffm.move())
1201 : makeFuture<T>(xm.move());
1206 template <class Policy, class FF>
1207 typename std::result_of<FF(size_t)>::type
1208 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1209 auto pm = makeMoveWrapper(std::move(p));
1210 auto q = [=](size_t k, exception_wrapper x) {
1211 return makeFuture<bool>((*pm)(k, x));
1213 return retrying(0, std::move(q), std::forward<FF>(ff));
1216 template <class Policy, class FF>
1217 typename std::result_of<FF(size_t)>::type
1218 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1219 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1222 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1223 template <class URNG>
1224 Duration retryingJitteredExponentialBackoffDur(
1226 Duration backoff_min,
1227 Duration backoff_max,
1228 double jitter_param,
1231 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1232 auto jitter = std::exp(dist(rng));
1233 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1234 return std::max(backoff_min, std::min(backoff_max, backoff));
1237 template <class Policy, class URNG>
1238 std::function<Future<bool>(size_t, const exception_wrapper&)>
1239 retryingPolicyCappedJitteredExponentialBackoff(
1241 Duration backoff_min,
1242 Duration backoff_max,
1243 double jitter_param,
1246 auto pm = makeMoveWrapper(std::move(p));
1247 auto rngp = std::make_shared<URNG>(std::move(rng));
1248 return [=](size_t n, const exception_wrapper& ex) mutable {
1249 if (n == max_tries) { return makeFuture(false); }
1250 return (*pm)(n, ex).then([=](bool v) {
1251 if (!v) { return makeFuture(false); }
1252 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1253 n, backoff_min, backoff_max, jitter_param, *rngp);
1254 return futures::sleep(backoff).then([] { return true; });
1259 template <class Policy, class URNG>
1260 std::function<Future<bool>(size_t, const exception_wrapper&)>
1261 retryingPolicyCappedJitteredExponentialBackoff(
1263 Duration backoff_min,
1264 Duration backoff_max,
1265 double jitter_param,
1268 retrying_policy_raw_tag) {
1269 auto pm = makeMoveWrapper(std::move(p));
1270 auto q = [=](size_t n, const exception_wrapper& e) {
1271 return makeFuture((*pm)(n, e));
1273 return retryingPolicyCappedJitteredExponentialBackoff(
1282 template <class Policy, class URNG>
1283 std::function<Future<bool>(size_t, const exception_wrapper&)>
1284 retryingPolicyCappedJitteredExponentialBackoff(
1286 Duration backoff_min,
1287 Duration backoff_max,
1288 double jitter_param,
1291 retrying_policy_fut_tag) {
1292 return retryingPolicyCappedJitteredExponentialBackoff(
1303 template <class Policy, class FF>
1304 typename std::result_of<FF(size_t)>::type
1305 retrying(Policy&& p, FF&& ff) {
1306 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1307 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1311 std::function<bool(size_t, const exception_wrapper&)>
1312 retryingPolicyBasic(
1314 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1317 template <class Policy, class URNG>
1318 std::function<Future<bool>(size_t, const exception_wrapper&)>
1319 retryingPolicyCappedJitteredExponentialBackoff(
1321 Duration backoff_min,
1322 Duration backoff_max,
1323 double jitter_param,
1326 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1327 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1338 std::function<Future<bool>(size_t, const exception_wrapper&)>
1339 retryingPolicyCappedJitteredExponentialBackoff(
1341 Duration backoff_min,
1342 Duration backoff_max,
1343 double jitter_param) {
1344 auto p = [](size_t, const exception_wrapper&) { return true; };
1345 return retryingPolicyCappedJitteredExponentialBackoff(
1356 // Instantiate the most common Future types to save compile time
1357 extern template class Future<Unit>;
1358 extern template class Future<bool>;
1359 extern template class Future<int>;
1360 extern template class Future<int64_t>;
1361 extern template class Future<std::string>;
1362 extern template class Future<double>;
1364 } // namespace folly