2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/Random.h>
27 #include <folly/Traits.h>
28 #include <folly/futures/detail/Core.h>
29 #include <folly/futures/Timekeeper.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
43 #if FOLLY_FUTURE_USING_FIBER
44 typedef folly::fibers::Baton FutureBatonType;
46 typedef folly::Baton<> FutureBatonType;
51 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
53 // Guarantees that the stored functor is destructed before the stored promise
54 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
55 template <typename T, typename F>
56 class CoreCallbackState {
58 template <typename FF>
59 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
60 noexcept(F(std::declval<FF>())))
61 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
62 assert(before_barrier());
65 CoreCallbackState(CoreCallbackState&& that) noexcept(
66 noexcept(F(std::declval<F>()))) {
67 if (that.before_barrier()) {
68 new (&func_) F(std::move(that.func_));
69 promise_ = that.stealPromise();
73 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
75 ~CoreCallbackState() {
76 if (before_barrier()) {
81 template <typename... Args>
82 auto invoke(Args&&... args) noexcept(
83 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
84 assert(before_barrier());
85 return std::move(func_)(std::forward<Args>(args)...);
88 template <typename... Args>
89 auto tryInvoke(Args&&... args) noexcept {
90 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
93 void setTry(Try<T>&& t) {
94 stealPromise().setTry(std::move(t));
97 void setException(exception_wrapper&& ew) {
98 stealPromise().setException(std::move(ew));
101 Promise<T> stealPromise() noexcept {
102 assert(before_barrier());
104 return std::move(promise_);
108 bool before_barrier() const noexcept {
109 return !promise_.isFulfilled();
115 Promise<T> promise_{detail::EmptyConstruct{}};
118 template <typename T, typename F>
119 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
120 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
121 std::declval<Promise<T>&&>(),
122 std::declval<F&&>()))) {
123 return CoreCallbackState<T, _t<std::decay<F>>>(
124 std::move(p), std::forward<F>(f));
129 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
130 other.core_ = nullptr;
134 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
135 std::swap(core_, other.core_);
140 template <class T2, typename>
141 Future<T>::Future(T2&& val)
142 : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
145 template <typename T2>
146 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
147 : core_(new detail::Core<T>(Try<T>(T()))) {}
150 Future<T>::~Future() {
155 void Future<T>::detach() {
157 core_->detachFuture();
163 void Future<T>::throwIfInvalid() const {
170 void Future<T>::setCallback_(F&& func) {
172 core_->setCallback(std::forward<F>(func));
179 typename std::enable_if<isFuture<F>::value,
180 Future<typename isFuture<T>::Inner>>::type
181 Future<T>::unwrap() {
182 return then([](Future<typename isFuture<T>::Inner> internal_future) {
183 return internal_future;
189 // Variant: returns a value
190 // e.g. f.then([](Try<T>&& t){ return t.value(); });
192 template <typename F, typename R, bool isTry, typename... Args>
193 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
194 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
195 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
196 typedef typename R::ReturnsFuture::Inner B;
201 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
203 // grab the Future now before we lose our handle on the Promise
204 auto f = p.getFuture();
205 f.core_->setExecutorNoLock(getExecutor());
207 /* This is a bit tricky.
209 We can't just close over *this in case this Future gets moved. So we
210 make a new dummy Future. We could figure out something more
211 sophisticated that avoids making a new Future object when it can, as an
212 optimization. But this is correct.
214 core_ can't be moved, it is explicitly disallowed (as is copying). But
215 if there's ever a reason to allow it, this is one place that makes that
216 assumption and would need to be fixed. We use a standard shared pointer
217 for core_ (by copying it in), which means in essence obj holds a shared
218 pointer to itself. But this shouldn't leak because Promise will not
219 outlive the continuation, because Promise will setException() with a
220 broken Promise if it is destructed before completed. We could use a
221 weak pointer but it would have to be converted to a shared pointer when
222 func is executed (because the Future returned by func may possibly
223 persist beyond the callback, if it gets moved), and so it is an
224 optimization to just make it shared from the get-go.
226 Two subtle but important points about this design. detail::Core has no
227 back pointers to Future or Promise, so if Future or Promise get moved
228 (and they will be moved in performant code) we don't have to do
229 anything fancy. And because we store the continuation in the
230 detail::Core, not in the Future, we can execute the continuation even
231 after the Future has gone out of scope. This is an intentional design
232 decision. It is likely we will want to be able to cancel a continuation
233 in some circumstances, but I think it should be explicit not implicit
234 in the destruction of the Future used to create it.
237 [state = detail::makeCoreCallbackState(
238 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
239 if (!isTry && t.hasException()) {
240 state.setException(std::move(t.exception()));
242 state.setTry(makeTryWith(
243 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
250 // Variant: returns a Future
251 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
253 template <typename F, typename R, bool isTry, typename... Args>
254 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
255 Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
256 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
257 typedef typename R::ReturnsFuture::Inner B;
262 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
264 // grab the Future now before we lose our handle on the Promise
265 auto f = p.getFuture();
266 f.core_->setExecutorNoLock(getExecutor());
269 [state = detail::makeCoreCallbackState(
270 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
271 if (!isTry && t.hasException()) {
272 state.setException(std::move(t.exception()));
274 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
275 if (tf2.hasException()) {
276 state.setException(std::move(tf2.exception()));
278 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
279 p.setTry(std::move(b));
288 template <typename T>
289 template <typename R, typename Caller, typename... Args>
290 Future<typename isFuture<R>::Inner>
291 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
292 typedef typename std::remove_cv<
293 typename std::remove_reference<
294 typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
295 return then([instance, func](Try<T>&& t){
296 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
301 Future<Unit> Future<T>::then() {
302 return then([] () {});
305 // onError where the callback returns T
308 typename std::enable_if<
309 !detail::callableWith<F, exception_wrapper>::value &&
310 !detail::Extract<F>::ReturnsFuture::value,
312 Future<T>::onError(F&& func) {
313 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
315 std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
316 "Return type of onError callback must be T or Future<T>");
319 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
320 auto f = p.getFuture();
323 [state = detail::makeCoreCallbackState(
324 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
325 if (auto e = t.template tryGetExceptionObject<Exn>()) {
326 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
328 state.setTry(std::move(t));
335 // onError where the callback returns Future<T>
338 typename std::enable_if<
339 !detail::callableWith<F, exception_wrapper>::value &&
340 detail::Extract<F>::ReturnsFuture::value,
342 Future<T>::onError(F&& func) {
344 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
345 "Return type of onError callback must be T or Future<T>");
346 typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
349 auto f = p.getFuture();
352 [state = detail::makeCoreCallbackState(
353 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
354 if (auto e = t.template tryGetExceptionObject<Exn>()) {
355 auto tf2 = state.tryInvoke(*e);
356 if (tf2.hasException()) {
357 state.setException(std::move(tf2.exception()));
359 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
360 p.setTry(std::move(t3));
364 state.setTry(std::move(t));
373 Future<T> Future<T>::ensure(F&& func) {
374 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
376 return makeFuture(std::move(t));
382 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
383 return within(dur, tk).onError([funcw = std::forward<F>(func)](
384 TimedOut const&) { return std::move(funcw)(); });
389 typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
390 detail::Extract<F>::ReturnsFuture::value,
392 Future<T>::onError(F&& func) {
394 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
395 "Return type of onError callback must be T or Future<T>");
398 auto f = p.getFuture();
400 [state = detail::makeCoreCallbackState(
401 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
402 if (t.hasException()) {
403 auto tf2 = state.tryInvoke(std::move(t.exception()));
404 if (tf2.hasException()) {
405 state.setException(std::move(tf2.exception()));
407 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
408 p.setTry(std::move(t3));
412 state.setTry(std::move(t));
419 // onError(exception_wrapper) that returns T
422 typename std::enable_if<
423 detail::callableWith<F, exception_wrapper>::value &&
424 !detail::Extract<F>::ReturnsFuture::value,
426 Future<T>::onError(F&& func) {
428 std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
429 "Return type of onError callback must be T or Future<T>");
432 auto f = p.getFuture();
434 [state = detail::makeCoreCallbackState(
435 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
436 if (t.hasException()) {
437 state.setTry(makeTryWith(
438 [&] { return state.invoke(std::move(t.exception())); }));
440 state.setTry(std::move(t));
448 typename std::add_lvalue_reference<T>::type Future<T>::value() {
451 return core_->getTry().value();
455 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
458 return core_->getTry().value();
462 Try<T>& Future<T>::getTry() {
465 return core_->getTry();
469 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
470 return waitVia(e).getTry();
474 Optional<Try<T>> Future<T>::poll() {
476 if (core_->ready()) {
477 o = std::move(core_->getTry());
483 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
486 setExecutor(executor, priority);
488 return std::move(*this);
492 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
496 auto f = p.getFuture();
497 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
498 return std::move(f).via(executor, priority);
501 template <class Func>
502 auto via(Executor* x, Func&& func)
503 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
504 // TODO make this actually more performant. :-P #7260175
505 return via(x).then(std::forward<Func>(func));
509 bool Future<T>::isReady() const {
511 return core_->ready();
515 bool Future<T>::hasValue() {
516 return getTry().hasValue();
520 bool Future<T>::hasException() {
521 return getTry().hasException();
525 void Future<T>::raise(exception_wrapper exception) {
526 core_->raise(std::move(exception));
532 Future<typename std::decay<T>::type> makeFuture(T&& t) {
533 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
536 inline // for multiple translation units
537 Future<Unit> makeFuture() {
538 return makeFuture(Unit{});
541 // makeFutureWith(Future<T>()) -> Future<T>
543 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
544 typename std::result_of<F()>::type>::type
545 makeFutureWith(F&& func) {
547 typename isFuture<typename std::result_of<F()>::type>::Inner;
549 return std::forward<F>(func)();
550 } catch (std::exception& e) {
551 return makeFuture<InnerType>(
552 exception_wrapper(std::current_exception(), e));
554 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
558 // makeFutureWith(T()) -> Future<T>
559 // makeFutureWith(void()) -> Future<Unit>
561 typename std::enable_if<
562 !(isFuture<typename std::result_of<F()>::type>::value),
563 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
564 makeFutureWith(F&& func) {
566 typename Unit::Lift<typename std::result_of<F()>::type>::type;
567 return makeFuture<LiftedResult>(
568 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
572 Future<T> makeFuture(std::exception_ptr const& e) {
573 return makeFuture(Try<T>(e));
577 Future<T> makeFuture(exception_wrapper ew) {
578 return makeFuture(Try<T>(std::move(ew)));
581 template <class T, class E>
582 typename std::enable_if<std::is_base_of<std::exception, E>::value,
584 makeFuture(E const& e) {
585 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
589 Future<T> makeFuture(Try<T>&& t) {
590 return Future<T>(new detail::Core<T>(std::move(t)));
594 Future<Unit> via(Executor* executor, int8_t priority) {
595 return makeFuture().via(executor, priority);
598 // mapSetCallback calls func(i, Try<T>) when every future completes
600 template <class T, class InputIterator, class F>
601 void mapSetCallback(InputIterator first, InputIterator last, F func) {
602 for (size_t i = 0; first != last; ++first, ++i) {
603 first->setCallback_([func, i](Try<T>&& t) {
604 func(i, std::move(t));
609 // collectAll (variadic)
611 template <typename... Fs>
612 typename detail::CollectAllVariadicContext<
613 typename std::decay<Fs>::type::value_type...>::type
614 collectAll(Fs&&... fs) {
615 auto ctx = std::make_shared<detail::CollectAllVariadicContext<
616 typename std::decay<Fs>::type::value_type...>>();
617 detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
618 ctx, std::forward<Fs>(fs)...);
619 return ctx->p.getFuture();
622 // collectAll (iterator)
624 template <class InputIterator>
627 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
628 collectAll(InputIterator first, InputIterator last) {
630 typename std::iterator_traits<InputIterator>::value_type::value_type T;
632 struct CollectAllContext {
633 CollectAllContext(size_t n) : results(n) {}
634 ~CollectAllContext() {
635 p.setValue(std::move(results));
637 Promise<std::vector<Try<T>>> p;
638 std::vector<Try<T>> results;
642 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
643 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
644 ctx->results[i] = std::move(t);
646 return ctx->p.getFuture();
649 // collect (iterator)
653 template <typename T>
654 struct CollectContext {
656 explicit Nothing(int /* n */) {}
659 using Result = typename std::conditional<
660 std::is_void<T>::value,
662 std::vector<T>>::type;
664 using InternalResult = typename std::conditional<
665 std::is_void<T>::value,
667 std::vector<Optional<T>>>::type;
669 explicit CollectContext(size_t n) : result(n) {}
671 if (!threw.exchange(true)) {
672 // map Optional<T> -> T
673 std::vector<T> finalResult;
674 finalResult.reserve(result.size());
675 std::transform(result.begin(), result.end(),
676 std::back_inserter(finalResult),
677 [](Optional<T>& o) { return std::move(o.value()); });
678 p.setValue(std::move(finalResult));
681 inline void setPartialResult(size_t i, Try<T>& t) {
682 result[i] = std::move(t.value());
685 InternalResult result;
686 std::atomic<bool> threw {false};
691 template <class InputIterator>
692 Future<typename detail::CollectContext<
693 typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
694 collect(InputIterator first, InputIterator last) {
696 typename std::iterator_traits<InputIterator>::value_type::value_type T;
698 auto ctx = std::make_shared<detail::CollectContext<T>>(
699 std::distance(first, last));
700 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
701 if (t.hasException()) {
702 if (!ctx->threw.exchange(true)) {
703 ctx->p.setException(std::move(t.exception()));
705 } else if (!ctx->threw) {
706 ctx->setPartialResult(i, t);
709 return ctx->p.getFuture();
712 // collect (variadic)
714 template <typename... Fs>
715 typename detail::CollectVariadicContext<
716 typename std::decay<Fs>::type::value_type...>::type
717 collect(Fs&&... fs) {
718 auto ctx = std::make_shared<detail::CollectVariadicContext<
719 typename std::decay<Fs>::type::value_type...>>();
720 detail::collectVariadicHelper<detail::CollectVariadicContext>(
721 ctx, std::forward<Fs>(fs)...);
722 return ctx->p.getFuture();
725 // collectAny (iterator)
727 template <class InputIterator>
732 std::iterator_traits<InputIterator>::value_type::value_type>>>
733 collectAny(InputIterator first, InputIterator last) {
735 typename std::iterator_traits<InputIterator>::value_type::value_type T;
737 struct CollectAnyContext {
738 CollectAnyContext() {}
739 Promise<std::pair<size_t, Try<T>>> p;
740 std::atomic<bool> done {false};
743 auto ctx = std::make_shared<CollectAnyContext>();
744 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
745 if (!ctx->done.exchange(true)) {
746 ctx->p.setValue(std::make_pair(i, std::move(t)));
749 return ctx->p.getFuture();
752 // collectAnyWithoutException (iterator)
754 template <class InputIterator>
757 typename std::iterator_traits<InputIterator>::value_type::value_type>>
758 collectAnyWithoutException(InputIterator first, InputIterator last) {
760 typename std::iterator_traits<InputIterator>::value_type::value_type T;
762 struct CollectAnyWithoutExceptionContext {
763 CollectAnyWithoutExceptionContext(){}
764 Promise<std::pair<size_t, T>> p;
765 std::atomic<bool> done{false};
766 std::atomic<size_t> nFulfilled{0};
770 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
771 ctx->nTotal = size_t(std::distance(first, last));
773 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
774 if (!t.hasException() && !ctx->done.exchange(true)) {
775 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
776 } else if (++ctx->nFulfilled == ctx->nTotal) {
777 ctx->p.setException(t.exception());
780 return ctx->p.getFuture();
783 // collectN (iterator)
785 template <class InputIterator>
786 Future<std::vector<std::pair<size_t, Try<typename
787 std::iterator_traits<InputIterator>::value_type::value_type>>>>
788 collectN(InputIterator first, InputIterator last, size_t n) {
790 std::iterator_traits<InputIterator>::value_type::value_type T;
791 typedef std::vector<std::pair<size_t, Try<T>>> V;
793 struct CollectNContext {
795 std::atomic<size_t> completed = {0};
798 auto ctx = std::make_shared<CollectNContext>();
800 if (size_t(std::distance(first, last)) < n) {
801 ctx->p.setException(std::runtime_error("Not enough futures"));
803 // for each completed Future, increase count and add to vector, until we
804 // have n completed futures at which point we fulfil our Promise with the
806 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
807 auto c = ++ctx->completed;
809 assert(ctx->v.size() < n);
810 ctx->v.emplace_back(i, std::move(t));
812 ctx->p.setTry(Try<V>(std::move(ctx->v)));
818 return ctx->p.getFuture();
823 template <class It, class T, class F>
824 Future<T> reduce(It first, It last, T&& initial, F&& func) {
826 return makeFuture(std::move(initial));
829 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
831 typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
834 typedef isTry<Arg> IsTry;
836 auto sfunc = std::make_shared<F>(std::move(func));
838 auto f = first->then(
839 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
841 std::move(minitial), head.template get<IsTry::value, Arg&&>());
844 for (++first; first != last; ++first) {
845 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
846 return (*sfunc)(std::move(std::get<0>(t).value()),
847 // Either return a ItT&& or a Try<ItT>&& depending
848 // on the type of the argument of func.
849 std::get<1>(t).template get<IsTry::value, Arg&&>());
856 // window (collection)
858 template <class Collection, class F, class ItT, class Result>
859 std::vector<Future<Result>>
860 window(Collection input, F func, size_t n) {
861 struct WindowContext {
862 WindowContext(Collection&& i, F&& fn)
863 : input_(std::move(i)), promises_(input_.size()),
866 std::atomic<size_t> i_ {0};
868 std::vector<Promise<Result>> promises_;
871 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
872 size_t i = ctx->i_++;
873 if (i < ctx->input_.size()) {
874 // Using setCallback_ directly since we don't need the Future
875 ctx->func_(std::move(ctx->input_[i])).setCallback_(
876 // ctx is captured by value
877 [ctx, i](Try<Result>&& t) {
878 ctx->promises_[i].setTry(std::move(t));
879 // Chain another future onto this one
880 spawn(std::move(ctx));
886 auto max = std::min(n, input.size());
888 auto ctx = std::make_shared<WindowContext>(
889 std::move(input), std::move(func));
891 for (size_t i = 0; i < max; ++i) {
892 // Start the first n Futures
893 WindowContext::spawn(ctx);
896 std::vector<Future<Result>> futures;
897 futures.reserve(ctx->promises_.size());
898 for (auto& promise : ctx->promises_) {
899 futures.emplace_back(promise.getFuture());
908 template <class I, class F>
909 Future<I> Future<T>::reduce(I&& initial, F&& func) {
911 minitial = std::forward<I>(initial),
912 mfunc = std::forward<F>(func)
914 auto ret = std::move(minitial);
915 for (auto& val : vals) {
916 ret = mfunc(std::move(ret), std::move(val));
922 // unorderedReduce (iterator)
924 template <class It, class T, class F, class ItT, class Arg>
925 Future<T> unorderedReduce(It first, It last, T initial, F func) {
927 return makeFuture(std::move(initial));
930 typedef isTry<Arg> IsTry;
932 struct UnorderedReduceContext {
933 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
934 : lock_(), memo_(makeFuture<T>(std::move(memo))),
935 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
937 folly::MicroSpinLock lock_; // protects memo_ and numThens_
940 size_t numThens_; // how many Futures completed and called .then()
941 size_t numFutures_; // how many Futures in total
945 auto ctx = std::make_shared<UnorderedReduceContext>(
946 std::move(initial), std::move(func), std::distance(first, last));
951 [ctx](size_t /* i */, Try<ItT>&& t) {
952 // Futures can be completed in any order, simultaneously.
953 // To make this non-blocking, we create a new Future chain in
954 // the order of completion to reduce the values.
955 // The spinlock just protects chaining a new Future, not actually
956 // executing the reduce, which should be really fast.
957 folly::MSLGuard lock(ctx->lock_);
959 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
960 // Either return a ItT&& or a Try<ItT>&& depending
961 // on the type of the argument of func.
962 return ctx->func_(std::move(v),
963 mt.template get<IsTry::value, Arg&&>());
965 if (++ctx->numThens_ == ctx->numFutures_) {
966 // After reducing the value of the last Future, fulfill the Promise
967 ctx->memo_.setCallback_(
968 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
972 return ctx->promise_.getFuture();
978 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
979 return within(dur, TimedOut(), tk);
984 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
987 Context(E ex) : exception(std::move(ex)), promise() {}
989 Future<Unit> thisFuture;
991 std::atomic<bool> token {false};
994 std::shared_ptr<Timekeeper> tks;
996 tks = folly::detail::getTimekeeperSingleton();
997 tk = DCHECK_NOTNULL(tks.get());
1000 auto ctx = std::make_shared<Context>(std::move(e));
1002 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1003 // TODO: "this" completed first, cancel "after"
1004 if (ctx->token.exchange(true) == false) {
1005 ctx->promise.setTry(std::move(t));
1009 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
1010 // "after" completed first, cancel "this"
1011 ctx->thisFuture.raise(TimedOut());
1012 if (ctx->token.exchange(true) == false) {
1013 if (t.hasException()) {
1014 ctx->promise.setException(std::move(t.exception()));
1016 ctx->promise.setException(std::move(ctx->exception));
1021 return ctx->promise.getFuture().via(getExecutor());
1027 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1028 return collectAll(*this, futures::sleep(dur, tk))
1029 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1030 Try<T>& t = std::get<0>(tup);
1031 return makeFuture<T>(std::move(t));
1038 void waitImpl(Future<T>& f) {
1039 // short-circuit if there's nothing to do
1040 if (f.isReady()) return;
1042 FutureBatonType baton;
1043 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1045 assert(f.isReady());
1049 void waitImpl(Future<T>& f, Duration dur) {
1050 // short-circuit if there's nothing to do
1056 auto ret = promise.getFuture();
1057 auto baton = std::make_shared<FutureBatonType>();
1058 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1059 promise.setTry(std::move(t));
1063 if (baton->timed_wait(dur)) {
1064 assert(f.isReady());
1069 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1070 // Set callback so to ensure that the via executor has something on it
1071 // so that once the preceding future triggers this callback, drive will
1072 // always have a callback to satisfy it
1075 f = f.via(e).then([](T&& t) { return std::move(t); });
1076 while (!f.isReady()) {
1079 assert(f.isReady());
1085 Future<T>& Future<T>::wait() & {
1086 detail::waitImpl(*this);
1091 Future<T>&& Future<T>::wait() && {
1092 detail::waitImpl(*this);
1093 return std::move(*this);
1097 Future<T>& Future<T>::wait(Duration dur) & {
1098 detail::waitImpl(*this, dur);
1103 Future<T>&& Future<T>::wait(Duration dur) && {
1104 detail::waitImpl(*this, dur);
1105 return std::move(*this);
1109 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1110 detail::waitViaImpl(*this, e);
1115 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1116 detail::waitViaImpl(*this, e);
1117 return std::move(*this);
1121 T Future<T>::get() {
1122 return std::move(wait().value());
1126 T Future<T>::get(Duration dur) {
1129 return std::move(value());
1136 T Future<T>::getVia(DrivableExecutor* e) {
1137 return std::move(waitVia(e).value());
1143 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1144 return t1.value() == t2.value();
1150 Future<bool> Future<T>::willEqual(Future<T>& f) {
1151 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1152 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1153 return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
1162 Future<T> Future<T>::filter(F&& predicate) {
1163 return this->then([p = std::forward<F>(predicate)](T val) {
1164 T const& valConstRef = val;
1165 if (!p(valConstRef)) {
1166 throw PredicateDoesNotObtain();
1173 inline Future<Unit> when(bool p, F&& thunk) {
1174 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1177 template <class P, class F>
1178 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1180 auto future = thunk();
1181 return future.then([
1182 predicate = std::forward<P>(predicate),
1183 thunk = std::forward<F>(thunk)
1185 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1188 return makeFuture();
1192 Future<Unit> times(const int n, F&& thunk) {
1193 return folly::whileDo(
1194 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1195 return count->fetch_add(1) < n;
1197 std::forward<F>(thunk));
1201 template <class It, class F, class ItT, class Result>
1202 std::vector<Future<Result>> map(It first, It last, F func) {
1203 std::vector<Future<Result>> results;
1204 for (auto it = first; it != last; it++) {
1205 results.push_back(it->then(func));
1215 struct retrying_policy_raw_tag {};
1216 struct retrying_policy_fut_tag {};
1218 template <class Policy>
1219 struct retrying_policy_traits {
1220 using ew = exception_wrapper;
1221 FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
1222 template <class Ret>
1223 using has_op = typename std::integral_constant<bool,
1224 has_op_call<Policy, Ret(size_t, const ew&)>::value ||
1225 has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
1226 using is_raw = has_op<bool>;
1227 using is_fut = has_op<Future<bool>>;
1228 using tag = typename std::conditional<
1229 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1230 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1233 template <class Policy, class FF, class Prom>
1234 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1235 using F = typename std::result_of<FF(size_t)>::type;
1236 using T = typename F::value_type;
1237 auto f = makeFutureWith([&] { return ff(k++); });
1240 prom = std::move(prom),
1241 pm = std::forward<Policy>(p),
1242 ffm = std::forward<FF>(ff)
1243 ](Try<T> && t) mutable {
1245 prom.setValue(std::move(t).value());
1248 auto& x = t.exception();
1252 prom = std::move(prom),
1255 ffm = std::move(ffm)
1256 ](bool shouldRetry) mutable {
1258 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1260 prom.setException(std::move(xm));
1266 template <class Policy, class FF>
1267 typename std::result_of<FF(size_t)>::type
1268 retrying(size_t k, Policy&& p, FF&& ff) {
1269 using F = typename std::result_of<FF(size_t)>::type;
1270 using T = typename F::value_type;
1271 auto prom = Promise<T>();
1272 auto f = prom.getFuture();
1274 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1278 template <class Policy, class FF>
1279 typename std::result_of<FF(size_t)>::type
1280 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1281 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1282 return makeFuture<bool>(pm(k, x));
1284 return retrying(0, std::move(q), std::forward<FF>(ff));
1287 template <class Policy, class FF>
1288 typename std::result_of<FF(size_t)>::type
1289 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1290 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1293 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1294 template <class URNG>
1295 Duration retryingJitteredExponentialBackoffDur(
1297 Duration backoff_min,
1298 Duration backoff_max,
1299 double jitter_param,
1302 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1303 auto jitter = std::exp(dist(rng));
1304 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1305 return std::max(backoff_min, std::min(backoff_max, backoff));
1308 template <class Policy, class URNG>
1309 std::function<Future<bool>(size_t, const exception_wrapper&)>
1310 retryingPolicyCappedJitteredExponentialBackoff(
1312 Duration backoff_min,
1313 Duration backoff_max,
1314 double jitter_param,
1318 pm = std::forward<Policy>(p),
1323 rngp = std::forward<URNG>(rng)
1324 ](size_t n, const exception_wrapper& ex) mutable {
1325 if (n == max_tries) {
1326 return makeFuture(false);
1328 return pm(n, ex).then(
1329 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1332 return makeFuture(false);
1334 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1335 n, backoff_min, backoff_max, jitter_param, rngp);
1336 return futures::sleep(backoff).then([] { return true; });
1341 template <class Policy, class URNG>
1342 std::function<Future<bool>(size_t, const exception_wrapper&)>
1343 retryingPolicyCappedJitteredExponentialBackoff(
1345 Duration backoff_min,
1346 Duration backoff_max,
1347 double jitter_param,
1350 retrying_policy_raw_tag) {
1351 auto q = [pm = std::forward<Policy>(p)](
1352 size_t n, const exception_wrapper& e) {
1353 return makeFuture(pm(n, e));
1355 return retryingPolicyCappedJitteredExponentialBackoff(
1360 std::forward<URNG>(rng),
1364 template <class Policy, class URNG>
1365 std::function<Future<bool>(size_t, const exception_wrapper&)>
1366 retryingPolicyCappedJitteredExponentialBackoff(
1368 Duration backoff_min,
1369 Duration backoff_max,
1370 double jitter_param,
1373 retrying_policy_fut_tag) {
1374 return retryingPolicyCappedJitteredExponentialBackoff(
1379 std::forward<URNG>(rng),
1380 std::forward<Policy>(p));
1384 template <class Policy, class FF>
1385 typename std::result_of<FF(size_t)>::type
1386 retrying(Policy&& p, FF&& ff) {
1387 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1388 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1392 std::function<bool(size_t, const exception_wrapper&)>
1393 retryingPolicyBasic(
1395 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1398 template <class Policy, class URNG>
1399 std::function<Future<bool>(size_t, const exception_wrapper&)>
1400 retryingPolicyCappedJitteredExponentialBackoff(
1402 Duration backoff_min,
1403 Duration backoff_max,
1404 double jitter_param,
1407 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1408 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1413 std::forward<URNG>(rng),
1414 std::forward<Policy>(p),
1419 std::function<Future<bool>(size_t, const exception_wrapper&)>
1420 retryingPolicyCappedJitteredExponentialBackoff(
1422 Duration backoff_min,
1423 Duration backoff_max,
1424 double jitter_param) {
1425 auto p = [](size_t, const exception_wrapper&) { return true; };
1426 return retryingPolicyCappedJitteredExponentialBackoff(
1437 // Instantiate the most common Future types to save compile time
1438 extern template class Future<Unit>;
1439 extern template class Future<bool>;
1440 extern template class Future<int>;
1441 extern template class Future<int64_t>;
1442 extern template class Future<std::string>;
1443 extern template class Future<double>;
1445 } // namespace folly