2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/experimental/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
55 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
56 other.core_ = nullptr;
60 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
61 std::swap(core_, other.core_);
66 template <typename U, typename>
67 Future<T>::Future(Future<U>&& other) noexcept
68 : core_(detail::Core<T>::convert(other.core_)) {
69 other.core_ = nullptr;
73 template <typename U, typename>
74 Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
75 std::swap(core_, detail::Core<T>::convert(other.core_));
80 template <class T2, typename>
81 Future<T>::Future(T2&& val)
82 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
85 template <typename, typename>
87 : core_(new detail::Core<T>(Try<T>(T()))) {}
90 Future<T>::~Future() {
95 void Future<T>::detach() {
97 core_->detachFuture();
103 void Future<T>::throwIfInvalid() const {
110 void Future<T>::setCallback_(F&& func) {
112 core_->setCallback(std::move(func));
119 typename std::enable_if<isFuture<F>::value,
120 Future<typename isFuture<T>::Inner>>::type
121 Future<T>::unwrap() {
122 return then([](Future<typename isFuture<T>::Inner> internal_future) {
123 return internal_future;
129 // Variant: returns a value
130 // e.g. f.then([](Try<T>&& t){ return t.value(); });
132 template <typename F, typename R, bool isTry, typename... Args>
133 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
134 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
135 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
136 typedef typename R::ReturnsFuture::Inner B;
140 // wrap these so we can move them into the lambda
141 folly::MoveWrapper<Promise<B>> p;
142 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
143 folly::MoveWrapper<F> funcm(std::forward<F>(func));
145 // grab the Future now before we lose our handle on the Promise
146 auto f = p->getFuture();
147 f.core_->setExecutorNoLock(getExecutor());
149 /* This is a bit tricky.
151 We can't just close over *this in case this Future gets moved. So we
152 make a new dummy Future. We could figure out something more
153 sophisticated that avoids making a new Future object when it can, as an
154 optimization. But this is correct.
156 core_ can't be moved, it is explicitly disallowed (as is copying). But
157 if there's ever a reason to allow it, this is one place that makes that
158 assumption and would need to be fixed. We use a standard shared pointer
159 for core_ (by copying it in), which means in essence obj holds a shared
160 pointer to itself. But this shouldn't leak because Promise will not
161 outlive the continuation, because Promise will setException() with a
162 broken Promise if it is destructed before completed. We could use a
163 weak pointer but it would have to be converted to a shared pointer when
164 func is executed (because the Future returned by func may possibly
165 persist beyond the callback, if it gets moved), and so it is an
166 optimization to just make it shared from the get-go.
168 We have to move in the Promise and func using the MoveWrapper
169 hack. (func could be copied but it's a big drag on perf).
171 Two subtle but important points about this design. detail::Core has no
172 back pointers to Future or Promise, so if Future or Promise get moved
173 (and they will be moved in performant code) we don't have to do
174 anything fancy. And because we store the continuation in the
175 detail::Core, not in the Future, we can execute the continuation even
176 after the Future has gone out of scope. This is an intentional design
177 decision. It is likely we will want to be able to cancel a continuation
178 in some circumstances, but I think it should be explicit not implicit
179 in the destruction of the Future used to create it.
182 [p, funcm](Try<T>&& t) mutable {
183 if (!isTry && t.hasException()) {
184 p->setException(std::move(t.exception()));
187 return (*funcm)(t.template get<isTry, Args>()...);
195 // Variant: returns a Future
196 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
198 template <typename F, typename R, bool isTry, typename... Args>
199 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
200 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
201 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
202 typedef typename R::ReturnsFuture::Inner B;
206 // wrap these so we can move them into the lambda
207 folly::MoveWrapper<Promise<B>> p;
208 p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
209 folly::MoveWrapper<F> funcm(std::forward<F>(func));
211 // grab the Future now before we lose our handle on the Promise
212 auto f = p->getFuture();
213 f.core_->setExecutorNoLock(getExecutor());
216 [p, funcm](Try<T>&& t) mutable {
217 if (!isTry && t.hasException()) {
218 p->setException(std::move(t.exception()));
221 auto f2 = (*funcm)(t.template get<isTry, Args>()...);
222 // that didn't throw, now we can steal p
223 f2.setCallback_([p](Try<B>&& b) mutable {
224 p->setTry(std::move(b));
226 } catch (const std::exception& e) {
227 p->setException(exception_wrapper(std::current_exception(), e));
229 p->setException(exception_wrapper(std::current_exception()));
237 template <typename T>
238 template <typename R, typename Caller, typename... Args>
239 Future<typename isFuture<R>::Inner>
240 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
241 typedef typename std::remove_cv<
242 typename std::remove_reference<
243 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
244 return then([instance, func](Try<T>&& t){
245 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
250 template <class Executor, class Arg, class... Args>
251 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
252 -> decltype(this->then(std::forward<Arg>(arg),
253 std::forward<Args>(args)...))
255 auto oldX = getExecutor();
257 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
262 Future<Unit> Future<T>::then() {
263 return then([] () {});
266 // onError where the callback returns T
269 typename std::enable_if<
270 !detail::callableWith<F, exception_wrapper>::value &&
271 !detail::Extract<F>::ReturnsFuture::value,
273 Future<T>::onError(F&& func) {
274 typedef typename detail::Extract<F>::FirstArg Exn;
276 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
277 "Return type of onError callback must be T or Future<T>");
280 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
281 auto f = p.getFuture();
282 auto pm = folly::makeMoveWrapper(std::move(p));
283 auto funcm = folly::makeMoveWrapper(std::move(func));
284 setCallback_([pm, funcm](Try<T>&& t) mutable {
285 if (!t.template withException<Exn>([&] (Exn& e) {
290 pm->setTry(std::move(t));
297 // onError where the callback returns Future<T>
300 typename std::enable_if<
301 !detail::callableWith<F, exception_wrapper>::value &&
302 detail::Extract<F>::ReturnsFuture::value,
304 Future<T>::onError(F&& func) {
306 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
307 "Return type of onError callback must be T or Future<T>");
308 typedef typename detail::Extract<F>::FirstArg Exn;
311 auto f = p.getFuture();
312 auto pm = folly::makeMoveWrapper(std::move(p));
313 auto funcm = folly::makeMoveWrapper(std::move(func));
314 setCallback_([pm, funcm](Try<T>&& t) mutable {
315 if (!t.template withException<Exn>([&] (Exn& e) {
317 auto f2 = (*funcm)(e);
318 f2.setCallback_([pm](Try<T>&& t2) mutable {
319 pm->setTry(std::move(t2));
321 } catch (const std::exception& e2) {
322 pm->setException(exception_wrapper(std::current_exception(), e2));
324 pm->setException(exception_wrapper(std::current_exception()));
327 pm->setTry(std::move(t));
336 Future<T> Future<T>::ensure(F func) {
337 MoveWrapper<F> funcw(std::move(func));
338 return this->then([funcw](Try<T>&& t) mutable {
340 return makeFuture(std::move(t));
346 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
347 auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
348 return within(dur, tk)
349 .onError([funcw](TimedOut const&) { return (*funcw)(); });
354 typename std::enable_if<
355 detail::callableWith<F, exception_wrapper>::value &&
356 detail::Extract<F>::ReturnsFuture::value,
358 Future<T>::onError(F&& func) {
360 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
361 "Return type of onError callback must be T or Future<T>");
364 auto f = p.getFuture();
365 auto pm = folly::makeMoveWrapper(std::move(p));
366 auto funcm = folly::makeMoveWrapper(std::move(func));
367 setCallback_([pm, funcm](Try<T> t) mutable {
368 if (t.hasException()) {
370 auto f2 = (*funcm)(std::move(t.exception()));
371 f2.setCallback_([pm](Try<T> t2) mutable {
372 pm->setTry(std::move(t2));
374 } catch (const std::exception& e2) {
375 pm->setException(exception_wrapper(std::current_exception(), e2));
377 pm->setException(exception_wrapper(std::current_exception()));
380 pm->setTry(std::move(t));
387 // onError(exception_wrapper) that returns T
390 typename std::enable_if<
391 detail::callableWith<F, exception_wrapper>::value &&
392 !detail::Extract<F>::ReturnsFuture::value,
394 Future<T>::onError(F&& func) {
396 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
397 "Return type of onError callback must be T or Future<T>");
400 auto f = p.getFuture();
401 auto pm = folly::makeMoveWrapper(std::move(p));
402 auto funcm = folly::makeMoveWrapper(std::move(func));
403 setCallback_([pm, funcm](Try<T> t) mutable {
404 if (t.hasException()) {
406 return (*funcm)(std::move(t.exception()));
409 pm->setTry(std::move(t));
417 typename std::add_lvalue_reference<T>::type Future<T>::value() {
420 return core_->getTry().value();
424 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
427 return core_->getTry().value();
431 Try<T>& Future<T>::getTry() {
434 return core_->getTry();
438 Optional<Try<T>> Future<T>::poll() {
440 if (core_->ready()) {
441 o = std::move(core_->getTry());
447 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
450 setExecutor(executor, priority);
452 return std::move(*this);
456 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
459 MoveWrapper<Promise<T>> p;
460 auto f = p->getFuture();
461 then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
462 return std::move(f).via(executor, priority);
466 template <class Func>
467 auto via(Executor* x, Func func)
468 -> Future<typename isFuture<decltype(func())>::Inner>
470 // TODO make this actually more performant. :-P #7260175
471 return via(x).then(func);
475 bool Future<T>::isReady() const {
477 return core_->ready();
481 bool Future<T>::hasValue() {
482 return getTry().hasValue();
486 bool Future<T>::hasException() {
487 return getTry().hasException();
491 void Future<T>::raise(exception_wrapper exception) {
492 core_->raise(std::move(exception));
498 Future<typename std::decay<T>::type> makeFuture(T&& t) {
499 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
502 inline // for multiple translation units
503 Future<Unit> makeFuture() {
504 return makeFuture(Unit{});
507 // makeFutureWith(Future<T>()) -> Future<T>
509 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
510 typename std::result_of<F()>::type>::type
511 makeFutureWith(F&& func) {
513 typename isFuture<typename std::result_of<F()>::type>::Inner;
516 } catch (std::exception& e) {
517 return makeFuture<InnerType>(
518 exception_wrapper(std::current_exception(), e));
520 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
524 // makeFutureWith(T()) -> Future<T>
525 // makeFutureWith(void()) -> Future<Unit>
527 typename std::enable_if<
528 !(isFuture<typename std::result_of<F()>::type>::value),
529 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
530 makeFutureWith(F&& func) {
532 typename Unit::Lift<typename std::result_of<F()>::type>::type;
533 return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
539 Future<T> makeFuture(std::exception_ptr const& e) {
540 return makeFuture(Try<T>(e));
544 Future<T> makeFuture(exception_wrapper ew) {
545 return makeFuture(Try<T>(std::move(ew)));
548 template <class T, class E>
549 typename std::enable_if<std::is_base_of<std::exception, E>::value,
551 makeFuture(E const& e) {
552 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
556 Future<T> makeFuture(Try<T>&& t) {
557 return Future<T>(new detail::Core<T>(std::move(t)));
561 Future<Unit> via(Executor* executor, int8_t priority) {
562 return makeFuture().via(executor, priority);
565 // mapSetCallback calls func(i, Try<T>) when every future completes
567 template <class T, class InputIterator, class F>
568 void mapSetCallback(InputIterator first, InputIterator last, F func) {
569 for (size_t i = 0; first != last; ++first, ++i) {
570 first->setCallback_([func, i](Try<T>&& t) {
571 func(i, std::move(t));
576 // collectAll (variadic)
578 template <typename... Fs>
579 typename detail::CollectAllVariadicContext<
580 typename std::decay<Fs>::type::value_type...>::type
581 collectAll(Fs&&... fs) {
582 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
583 typename std::decay<Fs>::type::value_type...>>();
584 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
585 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
586 return ctx->p.getFuture();
589 // collectAll (iterator)
591 template <class InputIterator>
594 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
595 collectAll(InputIterator first, InputIterator last) {
597 typename std::iterator_traits<InputIterator>::value_type::value_type T;
599 struct CollectAllContext {
600 CollectAllContext(int n) : results(n) {}
601 ~CollectAllContext() {
602 p.setValue(std::move(results));
604 Promise<std::vector<Try<T>>> p;
605 std::vector<Try<T>> results;
608 auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
609 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
610 ctx->results[i] = std::move(t);
612 return ctx->p.getFuture();
615 // collect (iterator)
619 template <typename T>
620 struct CollectContext {
622 explicit Nothing(int /* n */) {}
625 using Result = typename std::conditional<
626 std::is_void<T>::value,
628 std::vector<T>>::type;
630 using InternalResult = typename std::conditional<
631 std::is_void<T>::value,
633 std::vector<Optional<T>>>::type;
635 explicit CollectContext(int n) : result(n) {}
637 if (!threw.exchange(true)) {
638 // map Optional<T> -> T
639 std::vector<T> finalResult;
640 finalResult.reserve(result.size());
641 std::transform(result.begin(), result.end(),
642 std::back_inserter(finalResult),
643 [](Optional<T>& o) { return std::move(o.value()); });
644 p.setValue(std::move(finalResult));
647 inline void setPartialResult(size_t i, Try<T>& t) {
648 result[i] = std::move(t.value());
651 InternalResult result;
652 std::atomic<bool> threw {false};
657 template <class InputIterator>
658 Future<typename detail::CollectContext<
659 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
660 collect(InputIterator first, InputIterator last) {
662 typename std::iterator_traits<InputIterator>::value_type::value_type T;
664 auto ctx = std::make_shared<detail::CollectContext<T>>(
665 std::distance(first, last));
666 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
667 if (t.hasException()) {
668 if (!ctx->threw.exchange(true)) {
669 ctx->p.setException(std::move(t.exception()));
671 } else if (!ctx->threw) {
672 ctx->setPartialResult(i, t);
675 return ctx->p.getFuture();
678 // collect (variadic)
680 template <typename... Fs>
681 typename detail::CollectVariadicContext<
682 typename std::decay<Fs>::type::value_type...>::type
683 collect(Fs&&... fs) {
684 auto ctx = std::make_shared<detail::CollectVariadicContext<
685 typename std::decay<Fs>::type::value_type...>>();
686 detail::collectVariadicHelper<detail::CollectVariadicContext>(
687 ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
688 return ctx->p.getFuture();
691 // collectAny (iterator)
693 template <class InputIterator>
698 std::iterator_traits<InputIterator>::value_type::value_type>>>
699 collectAny(InputIterator first, InputIterator last) {
701 typename std::iterator_traits<InputIterator>::value_type::value_type T;
703 struct CollectAnyContext {
704 CollectAnyContext() {};
705 Promise<std::pair<size_t, Try<T>>> p;
706 std::atomic<bool> done {false};
709 auto ctx = std::make_shared<CollectAnyContext>();
710 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
711 if (!ctx->done.exchange(true)) {
712 ctx->p.setValue(std::make_pair(i, std::move(t)));
715 return ctx->p.getFuture();
718 // collectN (iterator)
720 template <class InputIterator>
721 Future<std::vector<std::pair<size_t, Try<typename
722 std::iterator_traits<InputIterator>::value_type::value_type>>>>
723 collectN(InputIterator first, InputIterator last, size_t n) {
725 std::iterator_traits<InputIterator>::value_type::value_type T;
726 typedef std::vector<std::pair<size_t, Try<T>>> V;
728 struct CollectNContext {
730 std::atomic<size_t> completed = {0};
733 auto ctx = std::make_shared<CollectNContext>();
735 if (size_t(std::distance(first, last)) < n) {
736 ctx->p.setException(std::runtime_error("Not enough futures"));
738 // for each completed Future, increase count and add to vector, until we
739 // have n completed futures at which point we fulfil our Promise with the
741 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
742 auto c = ++ctx->completed;
744 assert(ctx->v.size() < n);
745 ctx->v.emplace_back(i, std::move(t));
747 ctx->p.setTry(Try<V>(std::move(ctx->v)));
753 return ctx->p.getFuture();
758 template <class It, class T, class F>
759 Future<T> reduce(It first, It last, T&& initial, F&& func) {
761 return makeFuture(std::move(initial));
764 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
765 typedef typename std::conditional<
766 detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
767 typedef isTry<Arg> IsTry;
769 folly::MoveWrapper<T> minitial(std::move(initial));
770 auto sfunc = std::make_shared<F>(std::move(func));
772 auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
773 return (*sfunc)(std::move(*minitial),
774 head.template get<IsTry::value, Arg&&>());
777 for (++first; first != last; ++first) {
778 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
779 return (*sfunc)(std::move(std::get<0>(t).value()),
780 // Either return a ItT&& or a Try<ItT>&& depending
781 // on the type of the argument of func.
782 std::get<1>(t).template get<IsTry::value, Arg&&>());
789 // window (collection)
791 template <class Collection, class F, class ItT, class Result>
792 std::vector<Future<Result>>
793 window(Collection input, F func, size_t n) {
794 struct WindowContext {
795 WindowContext(Collection&& i, F&& fn)
796 : input_(std::move(i)), promises_(input_.size()),
799 std::atomic<size_t> i_ {0};
801 std::vector<Promise<Result>> promises_;
804 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
805 size_t i = ctx->i_++;
806 if (i < ctx->input_.size()) {
807 // Using setCallback_ directly since we don't need the Future
808 ctx->func_(std::move(ctx->input_[i])).setCallback_(
809 // ctx is captured by value
810 [ctx, i](Try<Result>&& t) {
811 ctx->promises_[i].setTry(std::move(t));
812 // Chain another future onto this one
813 spawn(std::move(ctx));
819 auto max = std::min(n, input.size());
821 auto ctx = std::make_shared<WindowContext>(
822 std::move(input), std::move(func));
824 for (size_t i = 0; i < max; ++i) {
825 // Start the first n Futures
826 WindowContext::spawn(ctx);
829 std::vector<Future<Result>> futures;
830 futures.reserve(ctx->promises_.size());
831 for (auto& promise : ctx->promises_) {
832 futures.emplace_back(promise.getFuture());
841 template <class I, class F>
842 Future<I> Future<T>::reduce(I&& initial, F&& func) {
843 folly::MoveWrapper<I> minitial(std::move(initial));
844 folly::MoveWrapper<F> mfunc(std::move(func));
845 return then([minitial, mfunc](T& vals) mutable {
846 auto ret = std::move(*minitial);
847 for (auto& val : vals) {
848 ret = (*mfunc)(std::move(ret), std::move(val));
854 // unorderedReduce (iterator)
856 template <class It, class T, class F, class ItT, class Arg>
857 Future<T> unorderedReduce(It first, It last, T initial, F func) {
859 return makeFuture(std::move(initial));
862 typedef isTry<Arg> IsTry;
864 struct UnorderedReduceContext {
865 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
866 : lock_(), memo_(makeFuture<T>(std::move(memo))),
867 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
869 folly::MicroSpinLock lock_; // protects memo_ and numThens_
872 size_t numThens_; // how many Futures completed and called .then()
873 size_t numFutures_; // how many Futures in total
877 auto ctx = std::make_shared<UnorderedReduceContext>(
878 std::move(initial), std::move(func), std::distance(first, last));
883 [ctx](size_t /* i */, Try<ItT>&& t) {
884 folly::MoveWrapper<Try<ItT>> mt(std::move(t));
885 // Futures can be completed in any order, simultaneously.
886 // To make this non-blocking, we create a new Future chain in
887 // the order of completion to reduce the values.
888 // The spinlock just protects chaining a new Future, not actually
889 // executing the reduce, which should be really fast.
890 folly::MSLGuard lock(ctx->lock_);
891 ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
892 // Either return a ItT&& or a Try<ItT>&& depending
893 // on the type of the argument of func.
894 return ctx->func_(std::move(v),
895 mt->template get<IsTry::value, Arg&&>());
897 if (++ctx->numThens_ == ctx->numFutures_) {
898 // After reducing the value of the last Future, fulfill the Promise
899 ctx->memo_.setCallback_(
900 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
904 return ctx->promise_.getFuture();
910 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
911 return within(dur, TimedOut(), tk);
916 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
919 Context(E ex) : exception(std::move(ex)), promise() {}
921 Future<Unit> thisFuture;
923 std::atomic<bool> token {false};
926 std::shared_ptr<Timekeeper> tks;
928 tks = folly::detail::getTimekeeperSingleton();
929 tk = DCHECK_NOTNULL(tks.get());
932 auto ctx = std::make_shared<Context>(std::move(e));
934 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
935 // TODO: "this" completed first, cancel "after"
936 if (ctx->token.exchange(true) == false) {
937 ctx->promise.setTry(std::move(t));
941 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
942 // "after" completed first, cancel "this"
943 ctx->thisFuture.raise(TimedOut());
944 if (ctx->token.exchange(true) == false) {
945 if (t.hasException()) {
946 ctx->promise.setException(std::move(t.exception()));
948 ctx->promise.setException(std::move(ctx->exception));
953 return ctx->promise.getFuture().via(getExecutor());
959 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
960 return collectAll(*this, futures::sleep(dur, tk))
961 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
962 Try<T>& t = std::get<0>(tup);
963 return makeFuture<T>(std::move(t));
970 void waitImpl(Future<T>& f) {
971 // short-circuit if there's nothing to do
972 if (f.isReady()) return;
974 FutureBatonType baton;
975 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
981 void waitImpl(Future<T>& f, Duration dur) {
982 // short-circuit if there's nothing to do
983 if (f.isReady()) return;
985 folly::MoveWrapper<Promise<T>> promise;
986 auto ret = promise->getFuture();
987 auto baton = std::make_shared<FutureBatonType>();
988 f.setCallback_([baton, promise](Try<T>&& t) mutable {
989 promise->setTry(std::move(t));
993 if (baton->timed_wait(dur)) {
999 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1000 // Set callback so to ensure that the via executor has something on it
1001 // so that once the preceding future triggers this callback, drive will
1002 // always have a callback to satisfy it
1005 f = f.then([](T&& t) { return std::move(t); });
1006 while (!f.isReady()) {
1009 assert(f.isReady());
1015 Future<T>& Future<T>::wait() & {
1016 detail::waitImpl(*this);
1021 Future<T>&& Future<T>::wait() && {
1022 detail::waitImpl(*this);
1023 return std::move(*this);
1027 Future<T>& Future<T>::wait(Duration dur) & {
1028 detail::waitImpl(*this, dur);
1033 Future<T>&& Future<T>::wait(Duration dur) && {
1034 detail::waitImpl(*this, dur);
1035 return std::move(*this);
1039 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1040 detail::waitViaImpl(*this, e);
1045 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1046 detail::waitViaImpl(*this, e);
1047 return std::move(*this);
1051 T Future<T>::get() {
1052 return std::move(wait().value());
1056 T Future<T>::get(Duration dur) {
1059 return std::move(value());
1066 T Future<T>::getVia(DrivableExecutor* e) {
1067 return std::move(waitVia(e).value());
1073 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1074 return t1.value() == t2.value();
1080 Future<bool> Future<T>::willEqual(Future<T>& f) {
1081 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1082 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1083 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1092 Future<T> Future<T>::filter(F predicate) {
1093 auto p = folly::makeMoveWrapper(std::move(predicate));
1094 return this->then([p](T val) {
1095 T const& valConstRef = val;
1096 if (!(*p)(valConstRef)) {
1097 throw PredicateDoesNotObtain();
1104 template <class Callback>
1105 auto Future<T>::thenMulti(Callback&& fn)
1106 -> decltype(this->then(std::forward<Callback>(fn))) {
1107 // thenMulti with one callback is just a then
1108 return then(std::forward<Callback>(fn));
1112 template <class Callback, class... Callbacks>
1113 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1114 -> decltype(this->then(std::forward<Callback>(fn)).
1115 thenMulti(std::forward<Callbacks>(fns)...)) {
1116 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1117 return then(std::forward<Callback>(fn)).
1118 thenMulti(std::forward<Callbacks>(fns)...);
1122 template <class Callback, class... Callbacks>
1123 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1125 -> decltype(this->then(std::forward<Callback>(fn)).
1126 thenMulti(std::forward<Callbacks>(fns)...)) {
1127 // thenMultiExecutor with two callbacks is
1128 // via(x).then(a).thenMulti(b, ...).via(oldX)
1129 auto oldX = getExecutor();
1131 return then(std::forward<Callback>(fn)).
1132 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1136 template <class Callback>
1137 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1138 -> decltype(this->then(std::forward<Callback>(fn))) {
1139 // thenMulti with one callback is just a then with an executor
1140 return then(x, std::forward<Callback>(fn));
1144 inline Future<Unit> when(bool p, F thunk) {
1145 return p ? thunk().unit() : makeFuture();
1148 template <class P, class F>
1149 Future<Unit> whileDo(P predicate, F thunk) {
1151 return thunk().then([=] {
1152 return whileDo(predicate, thunk);
1155 return makeFuture();
1159 Future<Unit> times(const int n, F thunk) {
1160 auto count = folly::makeMoveWrapper(
1161 std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
1163 return folly::whileDo([=]() mutable {
1164 return (*count)->fetch_add(1) < n;
1169 template <class It, class F, class ItT, class Result>
1170 std::vector<Future<Result>> map(It first, It last, F func) {
1171 std::vector<Future<Result>> results;
1172 for (auto it = first; it != last; it++) {
1173 results.push_back(it->then(func));
1183 struct retrying_policy_raw_tag {};
1184 struct retrying_policy_fut_tag {};
1186 template <class Policy>
1187 struct retrying_policy_traits {
1188 using ew = exception_wrapper;
1189 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1190 template <class Ret>
1191 using has_op = typename std::integral_constant<bool,
1192 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1193 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1194 using is_raw = has_op<bool>;
1195 using is_fut = has_op<Future<bool>>;
1196 using tag = typename std::conditional<
1197 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1198 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1201 template <class Policy, class FF>
1202 typename std::result_of<FF(size_t)>::type
1203 retrying(size_t k, Policy&& p, FF&& ff) {
1204 using F = typename std::result_of<FF(size_t)>::type;
1205 using T = typename F::value_type;
1207 auto pm = makeMoveWrapper(p);
1208 auto ffm = makeMoveWrapper(ff);
1209 return f.onError([=](exception_wrapper x) mutable {
1210 auto q = (*pm)(k, x);
1211 auto xm = makeMoveWrapper(std::move(x));
1212 return q.then([=](bool r) mutable {
1214 ? retrying(k, pm.move(), ffm.move())
1215 : makeFuture<T>(xm.move());
1220 template <class Policy, class FF>
1221 typename std::result_of<FF(size_t)>::type
1222 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1223 auto pm = makeMoveWrapper(std::move(p));
1224 auto q = [=](size_t k, exception_wrapper x) {
1225 return makeFuture<bool>((*pm)(k, x));
1227 return retrying(0, std::move(q), std::forward<FF>(ff));
1230 template <class Policy, class FF>
1231 typename std::result_of<FF(size_t)>::type
1232 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1233 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1236 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1237 template <class URNG>
1238 Duration retryingJitteredExponentialBackoffDur(
1240 Duration backoff_min,
1241 Duration backoff_max,
1242 double jitter_param,
1245 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1246 auto jitter = std::exp(dist(rng));
1247 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1248 return std::max(backoff_min, std::min(backoff_max, backoff));
1251 template <class Policy, class URNG>
1252 std::function<Future<bool>(size_t, const exception_wrapper&)>
1253 retryingPolicyCappedJitteredExponentialBackoff(
1255 Duration backoff_min,
1256 Duration backoff_max,
1257 double jitter_param,
1260 auto pm = makeMoveWrapper(std::move(p));
1261 auto rngp = std::make_shared<URNG>(std::move(rng));
1262 return [=](size_t n, const exception_wrapper& ex) mutable {
1263 if (n == max_tries) { return makeFuture(false); }
1264 return (*pm)(n, ex).then([=](bool v) {
1265 if (!v) { return makeFuture(false); }
1266 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1267 n, backoff_min, backoff_max, jitter_param, *rngp);
1268 return futures::sleep(backoff).then([] { return true; });
1273 template <class Policy, class URNG>
1274 std::function<Future<bool>(size_t, const exception_wrapper&)>
1275 retryingPolicyCappedJitteredExponentialBackoff(
1277 Duration backoff_min,
1278 Duration backoff_max,
1279 double jitter_param,
1282 retrying_policy_raw_tag) {
1283 auto pm = makeMoveWrapper(std::move(p));
1284 auto q = [=](size_t n, const exception_wrapper& e) {
1285 return makeFuture((*pm)(n, e));
1287 return retryingPolicyCappedJitteredExponentialBackoff(
1296 template <class Policy, class URNG>
1297 std::function<Future<bool>(size_t, const exception_wrapper&)>
1298 retryingPolicyCappedJitteredExponentialBackoff(
1300 Duration backoff_min,
1301 Duration backoff_max,
1302 double jitter_param,
1305 retrying_policy_fut_tag) {
1306 return retryingPolicyCappedJitteredExponentialBackoff(
1317 template <class Policy, class FF>
1318 typename std::result_of<FF(size_t)>::type
1319 retrying(Policy&& p, FF&& ff) {
1320 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1321 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1325 std::function<bool(size_t, const exception_wrapper&)>
1326 retryingPolicyBasic(
1328 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1331 template <class Policy, class URNG>
1332 std::function<Future<bool>(size_t, const exception_wrapper&)>
1333 retryingPolicyCappedJitteredExponentialBackoff(
1335 Duration backoff_min,
1336 Duration backoff_max,
1337 double jitter_param,
1340 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1341 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1352 std::function<Future<bool>(size_t, const exception_wrapper&)>
1353 retryingPolicyCappedJitteredExponentialBackoff(
1355 Duration backoff_min,
1356 Duration backoff_max,
1357 double jitter_param) {
1358 auto p = [](size_t, const exception_wrapper&) { return true; };
1359 return retryingPolicyCappedJitteredExponentialBackoff(
1370 // Instantiate the most common Future types to save compile time
1371 extern template class Future<Unit>;
1372 extern template class Future<bool>;
1373 extern template class Future<int>;
1374 extern template class Future<int64_t>;
1375 extern template class Future<std::string>;
1376 extern template class Future<double>;
1378 } // namespace folly