2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
53 // Guarantees that the stored functor is destructed before the stored promise
54 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
55 template <typename T, typename F>
56 class CoreCallbackState {
58 template <typename FF>
59 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
60 noexcept(F(std::declval<FF>())))
61 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
62 assert(before_barrier());
65 CoreCallbackState(CoreCallbackState&& that) noexcept(
66 noexcept(F(std::declval<F>()))) {
67 if (that.before_barrier()) {
68 new (&func_) F(std::move(that.func_));
69 promise_ = that.stealPromise();
73 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
75 ~CoreCallbackState() {
76 if (before_barrier()) {
81 template <typename... Args>
82 auto invoke(Args&&... args) noexcept(
83 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
84 assert(before_barrier());
85 return std::move(func_)(std::forward<Args>(args)...);
88 void setTry(Try<T>&& t) {
89 stealPromise().setTry(std::move(t));
92 void setException(exception_wrapper&& ew) {
93 stealPromise().setException(std::move(ew));
96 Promise<T> stealPromise() noexcept {
97 assert(before_barrier());
99 return std::move(promise_);
103 bool before_barrier() const noexcept {
104 return !promise_.isFulfilled();
110 Promise<T> promise_{detail::EmptyConstruct{}};
113 template <typename T, typename F>
114 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
115 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
116 std::declval<Promise<T>&&>(),
117 std::declval<F&&>()))) {
118 return CoreCallbackState<T, _t<std::decay<F>>>(
119 std::move(p), std::forward<F>(f));
124 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
125 other.core_ = nullptr;
129 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
130 std::swap(core_, other.core_);
135 template <class T2, typename>
136 Future<T>::Future(T2&& val)
137 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
140 template <typename T2>
141 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
142 : core_(new detail::Core<T>(Try<T>(T()))) {}
145 Future<T>::~Future() {
150 void Future<T>::detach() {
152 core_->detachFuture();
158 void Future<T>::throwIfInvalid() const {
165 void Future<T>::setCallback_(F&& func) {
167 core_->setCallback(std::forward<F>(func));
174 typename std::enable_if<isFuture<F>::value,
175 Future<typename isFuture<T>::Inner>>::type
176 Future<T>::unwrap() {
177 return then([](Future<typename isFuture<T>::Inner> internal_future) {
178 return internal_future;
184 // Variant: returns a value
185 // e.g. f.then([](Try<T>&& t){ return t.value(); });
187 template <typename F, typename R, bool isTry, typename... Args>
188 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
189 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
190 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
191 typedef typename R::ReturnsFuture::Inner B;
196 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
198 // grab the Future now before we lose our handle on the Promise
199 auto f = p.getFuture();
200 f.core_->setExecutorNoLock(getExecutor());
202 /* This is a bit tricky.
204 We can't just close over *this in case this Future gets moved. So we
205 make a new dummy Future. We could figure out something more
206 sophisticated that avoids making a new Future object when it can, as an
207 optimization. But this is correct.
209 core_ can't be moved, it is explicitly disallowed (as is copying). But
210 if there's ever a reason to allow it, this is one place that makes that
211 assumption and would need to be fixed. We use a standard shared pointer
212 for core_ (by copying it in), which means in essence obj holds a shared
213 pointer to itself. But this shouldn't leak because Promise will not
214 outlive the continuation, because Promise will setException() with a
215 broken Promise if it is destructed before completed. We could use a
216 weak pointer but it would have to be converted to a shared pointer when
217 func is executed (because the Future returned by func may possibly
218 persist beyond the callback, if it gets moved), and so it is an
219 optimization to just make it shared from the get-go.
221 Two subtle but important points about this design. detail::Core has no
222 back pointers to Future or Promise, so if Future or Promise get moved
223 (and they will be moved in performant code) we don't have to do
224 anything fancy. And because we store the continuation in the
225 detail::Core, not in the Future, we can execute the continuation even
226 after the Future has gone out of scope. This is an intentional design
227 decision. It is likely we will want to be able to cancel a continuation
228 in some circumstances, but I think it should be explicit not implicit
229 in the destruction of the Future used to create it.
232 [state = detail::makeCoreCallbackState(
233 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
234 if (!isTry && t.hasException()) {
235 state.setException(std::move(t.exception()));
237 state.setTry(makeTryWith(
238 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
245 // Variant: returns a Future
246 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
248 template <typename F, typename R, bool isTry, typename... Args>
249 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
250 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
251 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
252 typedef typename R::ReturnsFuture::Inner B;
257 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
259 // grab the Future now before we lose our handle on the Promise
260 auto f = p.getFuture();
261 f.core_->setExecutorNoLock(getExecutor());
264 [state = detail::makeCoreCallbackState(
265 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
267 if (!isTry && t.hasException()) {
268 return std::move(t.exception());
271 auto f2 = state.invoke(t.template get<isTry, Args>()...);
272 // that didn't throw, now we can steal p
273 f2.setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
274 p.setTry(std::move(b));
276 return exception_wrapper();
277 } catch (const std::exception& e) {
278 return exception_wrapper(std::current_exception(), e);
280 return exception_wrapper(std::current_exception());
285 state.setException(std::move(ew));
292 template <typename T>
293 template <typename R, typename Caller, typename... Args>
294 Future<typename isFuture<R>::Inner>
295 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
296 typedef typename std::remove_cv<
297 typename std::remove_reference<
298 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
299 return then([instance, func](Try<T>&& t){
300 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
305 template <class Executor, class Arg, class... Args>
306 auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
307 -> decltype(this->then(std::forward<Arg>(arg),
308 std::forward<Args>(args)...))
310 auto oldX = getExecutor();
312 return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
317 Future<Unit> Future<T>::then() {
318 return then([] () {});
321 // onError where the callback returns T
324 typename std::enable_if<
325 !detail::callableWith<F, exception_wrapper>::value &&
326 !detail::Extract<F>::ReturnsFuture::value,
328 Future<T>::onError(F&& func) {
329 typedef typename detail::Extract<F>::FirstArg Exn;
331 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
332 "Return type of onError callback must be T or Future<T>");
335 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
336 auto f = p.getFuture();
339 [state = detail::makeCoreCallbackState(
340 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
341 if (!t.template withException<Exn>([&](Exn& e) {
342 state.setTry(makeTryWith([&] { return state.invoke(e); }));
344 state.setTry(std::move(t));
351 // onError where the callback returns Future<T>
354 typename std::enable_if<
355 !detail::callableWith<F, exception_wrapper>::value &&
356 detail::Extract<F>::ReturnsFuture::value,
358 Future<T>::onError(F&& func) {
360 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
361 "Return type of onError callback must be T or Future<T>");
362 typedef typename detail::Extract<F>::FirstArg Exn;
365 auto f = p.getFuture();
368 [state = detail::makeCoreCallbackState(
369 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
370 if (!t.template withException<Exn>([&](Exn& e) {
373 auto f2 = state.invoke(e);
374 f2.setCallback_([p = state.stealPromise()](
375 Try<T> && t2) mutable { p.setTry(std::move(t2)); });
376 return exception_wrapper();
377 } catch (const std::exception& e2) {
378 return exception_wrapper(std::current_exception(), e2);
380 return exception_wrapper(std::current_exception());
384 state.setException(std::move(ew));
387 state.setTry(std::move(t));
396 Future<T> Future<T>::ensure(F&& func) {
397 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
399 return makeFuture(std::move(t));
405 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
406 return within(dur, tk).onError([funcw = std::forward<F>(func)](
407 TimedOut const&) { return std::move(funcw)(); });
412 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
413 detail::Extract<F>::ReturnsFuture::value,
415 Future<T>::onError(F&& func) {
417 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
418 "Return type of onError callback must be T or Future<T>");
421 auto f = p.getFuture();
423 [state = detail::makeCoreCallbackState(
424 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
425 if (t.hasException()) {
428 auto f2 = state.invoke(std::move(t.exception()));
429 f2.setCallback_([p = state.stealPromise()](Try<T> t2) mutable {
430 p.setTry(std::move(t2));
432 return exception_wrapper();
433 } catch (const std::exception& e2) {
434 return exception_wrapper(std::current_exception(), e2);
436 return exception_wrapper(std::current_exception());
440 state.setException(std::move(ew));
443 state.setTry(std::move(t));
450 // onError(exception_wrapper) that returns T
453 typename std::enable_if<
454 detail::callableWith<F, exception_wrapper>::value &&
455 !detail::Extract<F>::ReturnsFuture::value,
457 Future<T>::onError(F&& func) {
459 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
460 "Return type of onError callback must be T or Future<T>");
463 auto f = p.getFuture();
465 [state = detail::makeCoreCallbackState(
466 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
467 if (t.hasException()) {
468 state.setTry(makeTryWith(
469 [&] { return state.invoke(std::move(t.exception())); }));
471 state.setTry(std::move(t));
479 typename std::add_lvalue_reference<T>::type Future<T>::value() {
482 return core_->getTry().value();
486 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
489 return core_->getTry().value();
493 Try<T>& Future<T>::getTry() {
496 return core_->getTry();
500 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
501 return waitVia(e).getTry();
505 Optional<Try<T>> Future<T>::poll() {
507 if (core_->ready()) {
508 o = std::move(core_->getTry());
514 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
517 setExecutor(executor, priority);
519 return std::move(*this);
523 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
527 auto f = p.getFuture();
528 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
529 return std::move(f).via(executor, priority);
532 template <class Func>
533 auto via(Executor* x, Func&& func)
534 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
535 // TODO make this actually more performant. :-P #7260175
536 return via(x).then(std::forward<Func>(func));
540 bool Future<T>::isReady() const {
542 return core_->ready();
546 bool Future<T>::hasValue() {
547 return getTry().hasValue();
551 bool Future<T>::hasException() {
552 return getTry().hasException();
556 void Future<T>::raise(exception_wrapper exception) {
557 core_->raise(std::move(exception));
563 Future<typename std::decay<T>::type> makeFuture(T&& t) {
564 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
567 inline // for multiple translation units
568 Future<Unit> makeFuture() {
569 return makeFuture(Unit{});
572 // makeFutureWith(Future<T>()) -> Future<T>
574 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
575 typename std::result_of<F()>::type>::type
576 makeFutureWith(F&& func) {
578 typename isFuture<typename std::result_of<F()>::type>::Inner;
580 return std::forward<F>(func)();
581 } catch (std::exception& e) {
582 return makeFuture<InnerType>(
583 exception_wrapper(std::current_exception(), e));
585 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
589 // makeFutureWith(T()) -> Future<T>
590 // makeFutureWith(void()) -> Future<Unit>
592 typename std::enable_if<
593 !(isFuture<typename std::result_of<F()>::type>::value),
594 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
595 makeFutureWith(F&& func) {
597 typename Unit::Lift<typename std::result_of<F()>::type>::type;
598 return makeFuture<LiftedResult>(
599 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
603 Future<T> makeFuture(std::exception_ptr const& e) {
604 return makeFuture(Try<T>(e));
608 Future<T> makeFuture(exception_wrapper ew) {
609 return makeFuture(Try<T>(std::move(ew)));
612 template <class T, class E>
613 typename std::enable_if<std::is_base_of<std::exception, E>::value,
615 makeFuture(E const& e) {
616 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
620 Future<T> makeFuture(Try<T>&& t) {
621 return Future<T>(new detail::Core<T>(std::move(t)));
625 Future<Unit> via(Executor* executor, int8_t priority) {
626 return makeFuture().via(executor, priority);
629 // mapSetCallback calls func(i, Try<T>) when every future completes
631 template <class T, class InputIterator, class F>
632 void mapSetCallback(InputIterator first, InputIterator last, F func) {
633 for (size_t i = 0; first != last; ++first, ++i) {
634 first->setCallback_([func, i](Try<T>&& t) {
635 func(i, std::move(t));
640 // collectAll (variadic)
642 template <typename... Fs>
643 typename detail::CollectAllVariadicContext<
644 typename std::decay<Fs>::type::value_type...>::type
645 collectAll(Fs&&... fs) {
646 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
647 typename std::decay<Fs>::type::value_type...>>();
648 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
649 ctx, std::forward<Fs>(fs)...);
650 return ctx->p.getFuture();
653 // collectAll (iterator)
655 template <class InputIterator>
658 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
659 collectAll(InputIterator first, InputIterator last) {
661 typename std::iterator_traits<InputIterator>::value_type::value_type T;
663 struct CollectAllContext {
664 CollectAllContext(size_t n) : results(n) {}
665 ~CollectAllContext() {
666 p.setValue(std::move(results));
668 Promise<std::vector<Try<T>>> p;
669 std::vector<Try<T>> results;
673 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
674 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
675 ctx->results[i] = std::move(t);
677 return ctx->p.getFuture();
680 // collect (iterator)
684 template <typename T>
685 struct CollectContext {
687 explicit Nothing(int /* n */) {}
690 using Result = typename std::conditional<
691 std::is_void<T>::value,
693 std::vector<T>>::type;
695 using InternalResult = typename std::conditional<
696 std::is_void<T>::value,
698 std::vector<Optional<T>>>::type;
700 explicit CollectContext(size_t n) : result(n) {}
702 if (!threw.exchange(true)) {
703 // map Optional<T> -> T
704 std::vector<T> finalResult;
705 finalResult.reserve(result.size());
706 std::transform(result.begin(), result.end(),
707 std::back_inserter(finalResult),
708 [](Optional<T>& o) { return std::move(o.value()); });
709 p.setValue(std::move(finalResult));
712 inline void setPartialResult(size_t i, Try<T>& t) {
713 result[i] = std::move(t.value());
716 InternalResult result;
717 std::atomic<bool> threw {false};
722 template <class InputIterator>
723 Future<typename detail::CollectContext<
724 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
725 collect(InputIterator first, InputIterator last) {
727 typename std::iterator_traits<InputIterator>::value_type::value_type T;
729 auto ctx = std::make_shared<detail::CollectContext<T>>(
730 std::distance(first, last));
731 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
732 if (t.hasException()) {
733 if (!ctx->threw.exchange(true)) {
734 ctx->p.setException(std::move(t.exception()));
736 } else if (!ctx->threw) {
737 ctx->setPartialResult(i, t);
740 return ctx->p.getFuture();
743 // collect (variadic)
745 template <typename... Fs>
746 typename detail::CollectVariadicContext<
747 typename std::decay<Fs>::type::value_type...>::type
748 collect(Fs&&... fs) {
749 auto ctx = std::make_shared<detail::CollectVariadicContext<
750 typename std::decay<Fs>::type::value_type...>>();
751 detail::collectVariadicHelper<detail::CollectVariadicContext>(
752 ctx, std::forward<Fs>(fs)...);
753 return ctx->p.getFuture();
756 // collectAny (iterator)
758 template <class InputIterator>
763 std::iterator_traits<InputIterator>::value_type::value_type>>>
764 collectAny(InputIterator first, InputIterator last) {
766 typename std::iterator_traits<InputIterator>::value_type::value_type T;
768 struct CollectAnyContext {
769 CollectAnyContext() {}
770 Promise<std::pair<size_t, Try<T>>> p;
771 std::atomic<bool> done {false};
774 auto ctx = std::make_shared<CollectAnyContext>();
775 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
776 if (!ctx->done.exchange(true)) {
777 ctx->p.setValue(std::make_pair(i, std::move(t)));
780 return ctx->p.getFuture();
783 // collectAnyWithoutException (iterator)
785 template <class InputIterator>
788 typename std::iterator_traits<InputIterator>::value_type::value_type>>
789 collectAnyWithoutException(InputIterator first, InputIterator last) {
791 typename std::iterator_traits<InputIterator>::value_type::value_type T;
793 struct CollectAnyWithoutExceptionContext {
794 CollectAnyWithoutExceptionContext(){}
795 Promise<std::pair<size_t, T>> p;
796 std::atomic<bool> done{false};
797 std::atomic<size_t> nFulfilled{0};
801 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
802 ctx->nTotal = size_t(std::distance(first, last));
804 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
805 if (!t.hasException() && !ctx->done.exchange(true)) {
806 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
807 } else if (++ctx->nFulfilled == ctx->nTotal) {
808 ctx->p.setException(t.exception());
811 return ctx->p.getFuture();
814 // collectN (iterator)
816 template <class InputIterator>
817 Future<std::vector<std::pair<size_t, Try<typename
818 std::iterator_traits<InputIterator>::value_type::value_type>>>>
819 collectN(InputIterator first, InputIterator last, size_t n) {
821 std::iterator_traits<InputIterator>::value_type::value_type T;
822 typedef std::vector<std::pair<size_t, Try<T>>> V;
824 struct CollectNContext {
826 std::atomic<size_t> completed = {0};
829 auto ctx = std::make_shared<CollectNContext>();
831 if (size_t(std::distance(first, last)) < n) {
832 ctx->p.setException(std::runtime_error("Not enough futures"));
834 // for each completed Future, increase count and add to vector, until we
835 // have n completed futures at which point we fulfil our Promise with the
837 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
838 auto c = ++ctx->completed;
840 assert(ctx->v.size() < n);
841 ctx->v.emplace_back(i, std::move(t));
843 ctx->p.setTry(Try<V>(std::move(ctx->v)));
849 return ctx->p.getFuture();
854 template <class It, class T, class F>
855 Future<T> reduce(It first, It last, T&& initial, F&& func) {
857 return makeFuture(std::move(initial));
860 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
862 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
865 typedef isTry<Arg> IsTry;
867 auto sfunc = std::make_shared<F>(std::move(func));
869 auto f = first->then(
870 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
872 std::move(minitial), head.template get<IsTry::value, Arg&&>());
875 for (++first; first != last; ++first) {
876 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
877 return (*sfunc)(std::move(std::get<0>(t).value()),
878 // Either return a ItT&& or a Try<ItT>&& depending
879 // on the type of the argument of func.
880 std::get<1>(t).template get<IsTry::value, Arg&&>());
887 // window (collection)
889 template <class Collection, class F, class ItT, class Result>
890 std::vector<Future<Result>>
891 window(Collection input, F func, size_t n) {
892 struct WindowContext {
893 WindowContext(Collection&& i, F&& fn)
894 : input_(std::move(i)), promises_(input_.size()),
897 std::atomic<size_t> i_ {0};
899 std::vector<Promise<Result>> promises_;
902 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
903 size_t i = ctx->i_++;
904 if (i < ctx->input_.size()) {
905 // Using setCallback_ directly since we don't need the Future
906 ctx->func_(std::move(ctx->input_[i])).setCallback_(
907 // ctx is captured by value
908 [ctx, i](Try<Result>&& t) {
909 ctx->promises_[i].setTry(std::move(t));
910 // Chain another future onto this one
911 spawn(std::move(ctx));
917 auto max = std::min(n, input.size());
919 auto ctx = std::make_shared<WindowContext>(
920 std::move(input), std::move(func));
922 for (size_t i = 0; i < max; ++i) {
923 // Start the first n Futures
924 WindowContext::spawn(ctx);
927 std::vector<Future<Result>> futures;
928 futures.reserve(ctx->promises_.size());
929 for (auto& promise : ctx->promises_) {
930 futures.emplace_back(promise.getFuture());
939 template <class I, class F>
940 Future<I> Future<T>::reduce(I&& initial, F&& func) {
942 minitial = std::forward<I>(initial),
943 mfunc = std::forward<F>(func)
945 auto ret = std::move(minitial);
946 for (auto& val : vals) {
947 ret = mfunc(std::move(ret), std::move(val));
953 // unorderedReduce (iterator)
955 template <class It, class T, class F, class ItT, class Arg>
956 Future<T> unorderedReduce(It first, It last, T initial, F func) {
958 return makeFuture(std::move(initial));
961 typedef isTry<Arg> IsTry;
963 struct UnorderedReduceContext {
964 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
965 : lock_(), memo_(makeFuture<T>(std::move(memo))),
966 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
968 folly::MicroSpinLock lock_; // protects memo_ and numThens_
971 size_t numThens_; // how many Futures completed and called .then()
972 size_t numFutures_; // how many Futures in total
976 auto ctx = std::make_shared<UnorderedReduceContext>(
977 std::move(initial), std::move(func), std::distance(first, last));
982 [ctx](size_t /* i */, Try<ItT>&& t) {
983 // Futures can be completed in any order, simultaneously.
984 // To make this non-blocking, we create a new Future chain in
985 // the order of completion to reduce the values.
986 // The spinlock just protects chaining a new Future, not actually
987 // executing the reduce, which should be really fast.
988 folly::MSLGuard lock(ctx->lock_);
990 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
991 // Either return a ItT&& or a Try<ItT>&& depending
992 // on the type of the argument of func.
993 return ctx->func_(std::move(v),
994 mt.template get<IsTry::value, Arg&&>());
996 if (++ctx->numThens_ == ctx->numFutures_) {
997 // After reducing the value of the last Future, fulfill the Promise
998 ctx->memo_.setCallback_(
999 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1003 return ctx->promise_.getFuture();
1009 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1010 return within(dur, TimedOut(), tk);
1015 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1018 Context(E ex) : exception(std::move(ex)), promise() {}
1020 Future<Unit> thisFuture;
1022 std::atomic<bool> token {false};
1025 std::shared_ptr<Timekeeper> tks;
1027 tks = folly::detail::getTimekeeperSingleton();
1028 tk = DCHECK_NOTNULL(tks.get());
1031 auto ctx = std::make_shared<Context>(std::move(e));
1033 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1034 // TODO: "this" completed first, cancel "after"
1035 if (ctx->token.exchange(true) == false) {
1036 ctx->promise.setTry(std::move(t));
1040 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
1041 // "after" completed first, cancel "this"
1042 ctx->thisFuture.raise(TimedOut());
1043 if (ctx->token.exchange(true) == false) {
1044 if (t.hasException()) {
1045 ctx->promise.setException(std::move(t.exception()));
1047 ctx->promise.setException(std::move(ctx->exception));
1052 return ctx->promise.getFuture().via(getExecutor());
1058 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1059 return collectAll(*this, futures::sleep(dur, tk))
1060 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1061 Try<T>& t = std::get<0>(tup);
1062 return makeFuture<T>(std::move(t));
1069 void waitImpl(Future<T>& f) {
1070 // short-circuit if there's nothing to do
1071 if (f.isReady()) return;
1073 FutureBatonType baton;
1074 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1076 assert(f.isReady());
1080 void waitImpl(Future<T>& f, Duration dur) {
1081 // short-circuit if there's nothing to do
1087 auto ret = promise.getFuture();
1088 auto baton = std::make_shared<FutureBatonType>();
1089 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1090 promise.setTry(std::move(t));
1094 if (baton->timed_wait(dur)) {
1095 assert(f.isReady());
1100 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1101 // Set callback so to ensure that the via executor has something on it
1102 // so that once the preceding future triggers this callback, drive will
1103 // always have a callback to satisfy it
1106 f = f.via(e).then([](T&& t) { return std::move(t); });
1107 while (!f.isReady()) {
1110 assert(f.isReady());
1116 Future<T>& Future<T>::wait() & {
1117 detail::waitImpl(*this);
1122 Future<T>&& Future<T>::wait() && {
1123 detail::waitImpl(*this);
1124 return std::move(*this);
1128 Future<T>& Future<T>::wait(Duration dur) & {
1129 detail::waitImpl(*this, dur);
1134 Future<T>&& Future<T>::wait(Duration dur) && {
1135 detail::waitImpl(*this, dur);
1136 return std::move(*this);
1140 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1141 detail::waitViaImpl(*this, e);
1146 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1147 detail::waitViaImpl(*this, e);
1148 return std::move(*this);
1152 T Future<T>::get() {
1153 return std::move(wait().value());
1157 T Future<T>::get(Duration dur) {
1160 return std::move(value());
1167 T Future<T>::getVia(DrivableExecutor* e) {
1168 return std::move(waitVia(e).value());
1174 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1175 return t1.value() == t2.value();
1181 Future<bool> Future<T>::willEqual(Future<T>& f) {
1182 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1183 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1184 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1193 Future<T> Future<T>::filter(F&& predicate) {
1194 return this->then([p = std::forward<F>(predicate)](T val) {
1195 T const& valConstRef = val;
1196 if (!p(valConstRef)) {
1197 throw PredicateDoesNotObtain();
1204 template <class Callback>
1205 auto Future<T>::thenMulti(Callback&& fn)
1206 -> decltype(this->then(std::forward<Callback>(fn))) {
1207 // thenMulti with one callback is just a then
1208 return then(std::forward<Callback>(fn));
1212 template <class Callback, class... Callbacks>
1213 auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
1214 -> decltype(this->then(std::forward<Callback>(fn)).
1215 thenMulti(std::forward<Callbacks>(fns)...)) {
1216 // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
1217 return then(std::forward<Callback>(fn)).
1218 thenMulti(std::forward<Callbacks>(fns)...);
1222 template <class Callback, class... Callbacks>
1223 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
1225 -> decltype(this->then(std::forward<Callback>(fn)).
1226 thenMulti(std::forward<Callbacks>(fns)...)) {
1227 // thenMultiExecutor with two callbacks is
1228 // via(x).then(a).thenMulti(b, ...).via(oldX)
1229 auto oldX = getExecutor();
1231 return then(std::forward<Callback>(fn)).
1232 thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
1236 template <class Callback>
1237 auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
1238 -> decltype(this->then(std::forward<Callback>(fn))) {
1239 // thenMulti with one callback is just a then with an executor
1240 return then(x, std::forward<Callback>(fn));
1244 inline Future<Unit> when(bool p, F&& thunk) {
1245 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1248 template <class P, class F>
1249 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1251 auto future = thunk();
1252 return future.then([
1253 predicate = std::forward<P>(predicate),
1254 thunk = std::forward<F>(thunk)
1256 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1259 return makeFuture();
1263 Future<Unit> times(const int n, F&& thunk) {
1264 return folly::whileDo(
1265 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1266 return count->fetch_add(1) < n;
1268 std::forward<F>(thunk));
1272 template <class It, class F, class ItT, class Result>
1273 std::vector<Future<Result>> map(It first, It last, F func) {
1274 std::vector<Future<Result>> results;
1275 for (auto it = first; it != last; it++) {
1276 results.push_back(it->then(func));
1286 struct retrying_policy_raw_tag {};
1287 struct retrying_policy_fut_tag {};
1289 template <class Policy>
1290 struct retrying_policy_traits {
1291 using ew = exception_wrapper;
1292 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1293 template <class Ret>
1294 using has_op = typename std::integral_constant<bool,
1295 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1296 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1297 using is_raw = has_op<bool>;
1298 using is_fut = has_op<Future<bool>>;
1299 using tag = typename std::conditional<
1300 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1301 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1304 template <class Policy, class FF, class Prom>
1305 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1306 using F = typename std::result_of<FF(size_t)>::type;
1307 using T = typename F::value_type;
1308 auto f = makeFutureWith([&] { return ff(k++); });
1311 prom = std::move(prom),
1312 pm = std::forward<Policy>(p),
1313 ffm = std::forward<FF>(ff)
1314 ](Try<T> && t) mutable {
1316 prom.setValue(std::move(t).value());
1319 auto& x = t.exception();
1323 prom = std::move(prom),
1326 ffm = std::move(ffm)
1327 ](bool shouldRetry) mutable {
1329 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1331 prom.setException(std::move(xm));
1337 template <class Policy, class FF>
1338 typename std::result_of<FF(size_t)>::type
1339 retrying(size_t k, Policy&& p, FF&& ff) {
1340 using F = typename std::result_of<FF(size_t)>::type;
1341 using T = typename F::value_type;
1342 auto prom = Promise<T>();
1343 auto f = prom.getFuture();
1345 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1349 template <class Policy, class FF>
1350 typename std::result_of<FF(size_t)>::type
1351 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1352 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1353 return makeFuture<bool>(pm(k, x));
1355 return retrying(0, std::move(q), std::forward<FF>(ff));
1358 template <class Policy, class FF>
1359 typename std::result_of<FF(size_t)>::type
1360 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1361 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1364 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1365 template <class URNG>
1366 Duration retryingJitteredExponentialBackoffDur(
1368 Duration backoff_min,
1369 Duration backoff_max,
1370 double jitter_param,
1373 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1374 auto jitter = std::exp(dist(rng));
1375 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1376 return std::max(backoff_min, std::min(backoff_max, backoff));
1379 template <class Policy, class URNG>
1380 std::function<Future<bool>(size_t, const exception_wrapper&)>
1381 retryingPolicyCappedJitteredExponentialBackoff(
1383 Duration backoff_min,
1384 Duration backoff_max,
1385 double jitter_param,
1389 pm = std::forward<Policy>(p),
1394 rngp = std::forward<URNG>(rng)
1395 ](size_t n, const exception_wrapper& ex) mutable {
1396 if (n == max_tries) {
1397 return makeFuture(false);
1399 return pm(n, ex).then(
1400 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1403 return makeFuture(false);
1405 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1406 n, backoff_min, backoff_max, jitter_param, rngp);
1407 return futures::sleep(backoff).then([] { return true; });
1412 template <class Policy, class URNG>
1413 std::function<Future<bool>(size_t, const exception_wrapper&)>
1414 retryingPolicyCappedJitteredExponentialBackoff(
1416 Duration backoff_min,
1417 Duration backoff_max,
1418 double jitter_param,
1421 retrying_policy_raw_tag) {
1422 auto q = [pm = std::forward<Policy>(p)](
1423 size_t n, const exception_wrapper& e) {
1424 return makeFuture(pm(n, e));
1426 return retryingPolicyCappedJitteredExponentialBackoff(
1431 std::forward<URNG>(rng),
1435 template <class Policy, class URNG>
1436 std::function<Future<bool>(size_t, const exception_wrapper&)>
1437 retryingPolicyCappedJitteredExponentialBackoff(
1439 Duration backoff_min,
1440 Duration backoff_max,
1441 double jitter_param,
1444 retrying_policy_fut_tag) {
1445 return retryingPolicyCappedJitteredExponentialBackoff(
1450 std::forward<URNG>(rng),
1451 std::forward<Policy>(p));
1455 template <class Policy, class FF>
1456 typename std::result_of<FF(size_t)>::type
1457 retrying(Policy&& p, FF&& ff) {
1458 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1459 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1463 std::function<bool(size_t, const exception_wrapper&)>
1464 retryingPolicyBasic(
1466 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1469 template <class Policy, class URNG>
1470 std::function<Future<bool>(size_t, const exception_wrapper&)>
1471 retryingPolicyCappedJitteredExponentialBackoff(
1473 Duration backoff_min,
1474 Duration backoff_max,
1475 double jitter_param,
1478 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1479 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1484 std::forward<URNG>(rng),
1485 std::forward<Policy>(p),
1490 std::function<Future<bool>(size_t, const exception_wrapper&)>
1491 retryingPolicyCappedJitteredExponentialBackoff(
1493 Duration backoff_min,
1494 Duration backoff_max,
1495 double jitter_param) {
1496 auto p = [](size_t, const exception_wrapper&) { return true; };
1497 return retryingPolicyCappedJitteredExponentialBackoff(
1508 // Instantiate the most common Future types to save compile time
1509 extern template class Future<Unit>;
1510 extern template class Future<bool>;
1511 extern template class Future<int>;
1512 extern template class Future<int64_t>;
1513 extern template class Future<std::string>;
1514 extern template class Future<double>;
1516 } // namespace folly