2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include <folly/Baton.h>
26 #include <folly/Optional.h>
27 #include <folly/Random.h>
28 #include <folly/futures/Timekeeper.h>
29 #include <folly/futures/detail/Core.h>
31 #if FOLLY_MOBILE || defined(__APPLE__)
32 #define FOLLY_FUTURE_USING_FIBER 0
34 #define FOLLY_FUTURE_USING_FIBER 1
35 #include <folly/fibers/Baton.h>
44 #if FOLLY_FUTURE_USING_FIBER
45 typedef folly::fibers::Baton FutureBatonType;
47 typedef folly::Baton<> FutureBatonType;
50 } // namespace futures
53 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
58 // Guarantees that the stored functor is destructed before the stored promise
59 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
60 template <typename T, typename F>
61 class CoreCallbackState {
63 template <typename FF>
64 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
65 noexcept(F(std::declval<FF>())))
66 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
67 assert(before_barrier());
70 CoreCallbackState(CoreCallbackState&& that) noexcept(
71 noexcept(F(std::declval<F>()))) {
72 if (that.before_barrier()) {
73 new (&func_) F(std::move(that.func_));
74 promise_ = that.stealPromise();
78 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
80 ~CoreCallbackState() {
81 if (before_barrier()) {
86 template <typename... Args>
87 auto invoke(Args&&... args) noexcept(
88 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
89 assert(before_barrier());
90 return std::move(func_)(std::forward<Args>(args)...);
93 template <typename... Args>
94 auto tryInvoke(Args&&... args) noexcept {
95 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
98 void setTry(Try<T>&& t) {
99 stealPromise().setTry(std::move(t));
102 void setException(exception_wrapper&& ew) {
103 stealPromise().setException(std::move(ew));
106 Promise<T> stealPromise() noexcept {
107 assert(before_barrier());
109 return std::move(promise_);
113 bool before_barrier() const noexcept {
114 return !promise_.isFulfilled();
120 Promise<T> promise_{Promise<T>::makeEmpty()};
123 template <typename T, typename F>
124 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
125 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
126 std::declval<Promise<T>&&>(),
127 std::declval<F&&>()))) {
128 return CoreCallbackState<T, _t<std::decay<F>>>(
129 std::move(p), std::forward<F>(f));
131 } // namespace detail
132 } // namespace futures
135 Future<T> Future<T>::makeEmpty() {
136 return Future<T>(futures::detail::EmptyConstruct{});
140 Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
141 other.core_ = nullptr;
145 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
146 std::swap(core_, other.core_);
153 typename std::enable_if<
154 !std::is_same<T, typename std::decay<T2>::type>::value &&
155 std::is_constructible<T, T2&&>::value &&
156 std::is_convertible<T2&&, T>::value,
158 Future<T>::Future(Future<T2>&& other)
159 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
164 typename std::enable_if<
165 !std::is_same<T, typename std::decay<T2>::type>::value &&
166 std::is_constructible<T, T2&&>::value &&
167 !std::is_convertible<T2&&, T>::value,
169 Future<T>::Future(Future<T2>&& other)
170 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
175 typename std::enable_if<
176 !std::is_same<T, typename std::decay<T2>::type>::value &&
177 std::is_constructible<T, T2&&>::value,
179 Future<T>& Future<T>::operator=(Future<T2>&& other) {
181 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
185 template <class T2, typename>
186 Future<T>::Future(T2&& val)
187 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
190 template <typename T2>
191 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
192 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
197 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
199 Future<T>::Future(in_place_t, Args&&... args)
201 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
205 Future<T>::~Future() {
210 void Future<T>::detach() {
212 core_->detachFuture();
218 void Future<T>::throwIfInvalid() const {
225 void Future<T>::setCallback_(F&& func) {
227 core_->setCallback(std::forward<F>(func));
234 typename std::enable_if<isFuture<F>::value,
235 Future<typename isFuture<T>::Inner>>::type
236 Future<T>::unwrap() {
237 return then([](Future<typename isFuture<T>::Inner> internal_future) {
238 return internal_future;
244 // Variant: returns a value
245 // e.g. f.then([](Try<T>&& t){ return t.value(); });
247 template <typename F, typename R, bool isTry, typename... Args>
248 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
249 Future<T>::thenImplementation(
251 futures::detail::argResult<isTry, F, Args...>) {
252 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
253 typedef typename R::ReturnsFuture::Inner B;
258 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
260 // grab the Future now before we lose our handle on the Promise
261 auto f = p.getFuture();
262 f.core_->setExecutorNoLock(getExecutor());
264 /* This is a bit tricky.
266 We can't just close over *this in case this Future gets moved. So we
267 make a new dummy Future. We could figure out something more
268 sophisticated that avoids making a new Future object when it can, as an
269 optimization. But this is correct.
271 core_ can't be moved, it is explicitly disallowed (as is copying). But
272 if there's ever a reason to allow it, this is one place that makes that
273 assumption and would need to be fixed. We use a standard shared pointer
274 for core_ (by copying it in), which means in essence obj holds a shared
275 pointer to itself. But this shouldn't leak because Promise will not
276 outlive the continuation, because Promise will setException() with a
277 broken Promise if it is destructed before completed. We could use a
278 weak pointer but it would have to be converted to a shared pointer when
279 func is executed (because the Future returned by func may possibly
280 persist beyond the callback, if it gets moved), and so it is an
281 optimization to just make it shared from the get-go.
283 Two subtle but important points about this design. futures::detail::Core
284 has no back pointers to Future or Promise, so if Future or Promise get
285 moved (and they will be moved in performant code) we don't have to do
286 anything fancy. And because we store the continuation in the
287 futures::detail::Core, not in the Future, we can execute the continuation
288 even after the Future has gone out of scope. This is an intentional design
289 decision. It is likely we will want to be able to cancel a continuation
290 in some circumstances, but I think it should be explicit not implicit
291 in the destruction of the Future used to create it.
294 [state = futures::detail::makeCoreCallbackState(
295 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
296 if (!isTry && t.hasException()) {
297 state.setException(std::move(t.exception()));
299 state.setTry(makeTryWith(
300 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
307 // Variant: returns a Future
308 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
310 template <typename F, typename R, bool isTry, typename... Args>
311 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
312 Future<T>::thenImplementation(
314 futures::detail::argResult<isTry, F, Args...>) {
315 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
316 typedef typename R::ReturnsFuture::Inner B;
321 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
323 // grab the Future now before we lose our handle on the Promise
324 auto f = p.getFuture();
325 f.core_->setExecutorNoLock(getExecutor());
328 [state = futures::detail::makeCoreCallbackState(
329 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
330 if (!isTry && t.hasException()) {
331 state.setException(std::move(t.exception()));
333 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
334 if (tf2.hasException()) {
335 state.setException(std::move(tf2.exception()));
337 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
338 p.setTry(std::move(b));
347 template <typename T>
348 template <typename R, typename Caller, typename... Args>
349 Future<typename isFuture<R>::Inner>
350 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
351 typedef typename std::remove_cv<typename std::remove_reference<
352 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
354 return then([instance, func](Try<T>&& t){
355 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
360 Future<Unit> Future<T>::then() {
361 return then([] () {});
364 // onError where the callback returns T
367 typename std::enable_if<
368 !futures::detail::callableWith<F, exception_wrapper>::value &&
369 !futures::detail::Extract<F>::ReturnsFuture::value,
371 Future<T>::onError(F&& func) {
372 typedef std::remove_reference_t<
373 typename futures::detail::Extract<F>::FirstArg>
376 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
377 "Return type of onError callback must be T or Future<T>");
380 p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
381 auto f = p.getFuture();
384 [state = futures::detail::makeCoreCallbackState(
385 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
386 if (auto e = t.template tryGetExceptionObject<Exn>()) {
387 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
389 state.setTry(std::move(t));
396 // onError where the callback returns Future<T>
399 typename std::enable_if<
400 !futures::detail::callableWith<F, exception_wrapper>::value &&
401 futures::detail::Extract<F>::ReturnsFuture::value,
403 Future<T>::onError(F&& func) {
405 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
407 "Return type of onError callback must be T or Future<T>");
408 typedef std::remove_reference_t<
409 typename futures::detail::Extract<F>::FirstArg>
413 auto f = p.getFuture();
416 [state = futures::detail::makeCoreCallbackState(
417 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
418 if (auto e = t.template tryGetExceptionObject<Exn>()) {
419 auto tf2 = state.tryInvoke(*e);
420 if (tf2.hasException()) {
421 state.setException(std::move(tf2.exception()));
423 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
424 p.setTry(std::move(t3));
428 state.setTry(std::move(t));
437 Future<T> Future<T>::ensure(F&& func) {
438 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
440 return makeFuture(std::move(t));
446 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
447 return within(dur, tk).onError([funcw = std::forward<F>(func)](
448 TimedOut const&) { return std::move(funcw)(); });
453 typename std::enable_if<
454 futures::detail::callableWith<F, exception_wrapper>::value &&
455 futures::detail::Extract<F>::ReturnsFuture::value,
457 Future<T>::onError(F&& func) {
459 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
461 "Return type of onError callback must be T or Future<T>");
464 auto f = p.getFuture();
466 [state = futures::detail::makeCoreCallbackState(
467 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
468 if (t.hasException()) {
469 auto tf2 = state.tryInvoke(std::move(t.exception()));
470 if (tf2.hasException()) {
471 state.setException(std::move(tf2.exception()));
473 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
474 p.setTry(std::move(t3));
478 state.setTry(std::move(t));
485 // onError(exception_wrapper) that returns T
488 typename std::enable_if<
489 futures::detail::callableWith<F, exception_wrapper>::value &&
490 !futures::detail::Extract<F>::ReturnsFuture::value,
492 Future<T>::onError(F&& func) {
494 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
496 "Return type of onError callback must be T or Future<T>");
499 auto f = p.getFuture();
501 [state = futures::detail::makeCoreCallbackState(
502 std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
503 if (t.hasException()) {
504 state.setTry(makeTryWith(
505 [&] { return state.invoke(std::move(t.exception())); }));
507 state.setTry(std::move(t));
515 typename std::add_lvalue_reference<T>::type Future<T>::value() {
518 return core_->getTry().value();
522 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
525 return core_->getTry().value();
529 Try<T>& Future<T>::getTry() {
532 return core_->getTry();
536 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
537 return waitVia(e).getTry();
541 Optional<Try<T>> Future<T>::poll() {
543 if (core_->ready()) {
544 o = std::move(core_->getTry());
550 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
553 setExecutor(executor, priority);
555 return std::move(*this);
559 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
563 auto f = p.getFuture();
564 then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
565 return std::move(f).via(executor, priority);
568 template <class Func>
569 auto via(Executor* x, Func&& func)
570 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
571 // TODO make this actually more performant. :-P #7260175
572 return via(x).then(std::forward<Func>(func));
576 bool Future<T>::isReady() const {
578 return core_->ready();
582 bool Future<T>::hasValue() {
583 return getTry().hasValue();
587 bool Future<T>::hasException() {
588 return getTry().hasException();
592 void Future<T>::raise(exception_wrapper exception) {
593 core_->raise(std::move(exception));
597 Future<T>::Future(futures::detail::EmptyConstruct) noexcept : core_(nullptr) {}
602 Future<typename std::decay<T>::type> makeFuture(T&& t) {
603 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
606 inline // for multiple translation units
607 Future<Unit> makeFuture() {
608 return makeFuture(Unit{});
611 // makeFutureWith(Future<T>()) -> Future<T>
613 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
614 typename std::result_of<F()>::type>::type
615 makeFutureWith(F&& func) {
617 typename isFuture<typename std::result_of<F()>::type>::Inner;
619 return std::forward<F>(func)();
620 } catch (std::exception& e) {
621 return makeFuture<InnerType>(
622 exception_wrapper(std::current_exception(), e));
624 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
628 // makeFutureWith(T()) -> Future<T>
629 // makeFutureWith(void()) -> Future<Unit>
631 typename std::enable_if<
632 !(isFuture<typename std::result_of<F()>::type>::value),
633 Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
634 makeFutureWith(F&& func) {
636 typename Unit::Lift<typename std::result_of<F()>::type>::type;
637 return makeFuture<LiftedResult>(
638 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
642 Future<T> makeFuture(std::exception_ptr const& e) {
643 return makeFuture(Try<T>(e));
647 Future<T> makeFuture(exception_wrapper ew) {
648 return makeFuture(Try<T>(std::move(ew)));
651 template <class T, class E>
652 typename std::enable_if<std::is_base_of<std::exception, E>::value,
654 makeFuture(E const& e) {
655 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
659 Future<T> makeFuture(Try<T>&& t) {
660 return Future<T>(new futures::detail::Core<T>(std::move(t)));
664 Future<Unit> via(Executor* executor, int8_t priority) {
665 return makeFuture().via(executor, priority);
668 // mapSetCallback calls func(i, Try<T>) when every future completes
670 template <class T, class InputIterator, class F>
671 void mapSetCallback(InputIterator first, InputIterator last, F func) {
672 for (size_t i = 0; first != last; ++first, ++i) {
673 first->setCallback_([func, i](Try<T>&& t) {
674 func(i, std::move(t));
679 // collectAll (variadic)
681 template <typename... Fs>
682 typename futures::detail::CollectAllVariadicContext<
683 typename std::decay<Fs>::type::value_type...>::type
684 collectAll(Fs&&... fs) {
685 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
686 typename std::decay<Fs>::type::value_type...>>();
687 futures::detail::collectVariadicHelper<
688 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
689 return ctx->p.getFuture();
692 // collectAll (iterator)
694 template <class InputIterator>
697 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
698 collectAll(InputIterator first, InputIterator last) {
700 typename std::iterator_traits<InputIterator>::value_type::value_type T;
702 struct CollectAllContext {
703 CollectAllContext(size_t n) : results(n) {}
704 ~CollectAllContext() {
705 p.setValue(std::move(results));
707 Promise<std::vector<Try<T>>> p;
708 std::vector<Try<T>> results;
712 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
713 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
714 ctx->results[i] = std::move(t);
716 return ctx->p.getFuture();
719 // collect (iterator)
724 template <typename T>
725 struct CollectContext {
727 explicit Nothing(int /* n */) {}
730 using Result = typename std::conditional<
731 std::is_void<T>::value,
733 std::vector<T>>::type;
735 using InternalResult = typename std::conditional<
736 std::is_void<T>::value,
738 std::vector<Optional<T>>>::type;
740 explicit CollectContext(size_t n) : result(n) {}
742 if (!threw.exchange(true)) {
743 // map Optional<T> -> T
744 std::vector<T> finalResult;
745 finalResult.reserve(result.size());
746 std::transform(result.begin(), result.end(),
747 std::back_inserter(finalResult),
748 [](Optional<T>& o) { return std::move(o.value()); });
749 p.setValue(std::move(finalResult));
752 inline void setPartialResult(size_t i, Try<T>& t) {
753 result[i] = std::move(t.value());
756 InternalResult result;
757 std::atomic<bool> threw {false};
760 } // namespace detail
761 } // namespace futures
763 template <class InputIterator>
764 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
765 InputIterator>::value_type::value_type>::Result>
766 collect(InputIterator first, InputIterator last) {
768 typename std::iterator_traits<InputIterator>::value_type::value_type T;
770 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
771 std::distance(first, last));
772 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
773 if (t.hasException()) {
774 if (!ctx->threw.exchange(true)) {
775 ctx->p.setException(std::move(t.exception()));
777 } else if (!ctx->threw) {
778 ctx->setPartialResult(i, t);
781 return ctx->p.getFuture();
784 // collect (variadic)
786 template <typename... Fs>
787 typename futures::detail::CollectVariadicContext<
788 typename std::decay<Fs>::type::value_type...>::type
789 collect(Fs&&... fs) {
790 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
791 typename std::decay<Fs>::type::value_type...>>();
792 futures::detail::collectVariadicHelper<
793 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
794 return ctx->p.getFuture();
797 // collectAny (iterator)
799 template <class InputIterator>
804 std::iterator_traits<InputIterator>::value_type::value_type>>>
805 collectAny(InputIterator first, InputIterator last) {
807 typename std::iterator_traits<InputIterator>::value_type::value_type T;
809 struct CollectAnyContext {
810 CollectAnyContext() {}
811 Promise<std::pair<size_t, Try<T>>> p;
812 std::atomic<bool> done {false};
815 auto ctx = std::make_shared<CollectAnyContext>();
816 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
817 if (!ctx->done.exchange(true)) {
818 ctx->p.setValue(std::make_pair(i, std::move(t)));
821 return ctx->p.getFuture();
824 // collectAnyWithoutException (iterator)
826 template <class InputIterator>
829 typename std::iterator_traits<InputIterator>::value_type::value_type>>
830 collectAnyWithoutException(InputIterator first, InputIterator last) {
832 typename std::iterator_traits<InputIterator>::value_type::value_type T;
834 struct CollectAnyWithoutExceptionContext {
835 CollectAnyWithoutExceptionContext(){}
836 Promise<std::pair<size_t, T>> p;
837 std::atomic<bool> done{false};
838 std::atomic<size_t> nFulfilled{0};
842 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
843 ctx->nTotal = size_t(std::distance(first, last));
845 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
846 if (!t.hasException() && !ctx->done.exchange(true)) {
847 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
848 } else if (++ctx->nFulfilled == ctx->nTotal) {
849 ctx->p.setException(t.exception());
852 return ctx->p.getFuture();
855 // collectN (iterator)
857 template <class InputIterator>
858 Future<std::vector<std::pair<size_t, Try<typename
859 std::iterator_traits<InputIterator>::value_type::value_type>>>>
860 collectN(InputIterator first, InputIterator last, size_t n) {
862 std::iterator_traits<InputIterator>::value_type::value_type T;
863 typedef std::vector<std::pair<size_t, Try<T>>> V;
865 struct CollectNContext {
867 std::atomic<size_t> completed = {0};
870 auto ctx = std::make_shared<CollectNContext>();
872 if (size_t(std::distance(first, last)) < n) {
873 ctx->p.setException(std::runtime_error("Not enough futures"));
875 // for each completed Future, increase count and add to vector, until we
876 // have n completed futures at which point we fulfil our Promise with the
878 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
879 auto c = ++ctx->completed;
881 assert(ctx->v.size() < n);
882 ctx->v.emplace_back(i, std::move(t));
884 ctx->p.setTry(Try<V>(std::move(ctx->v)));
890 return ctx->p.getFuture();
895 template <class It, class T, class F>
896 Future<T> reduce(It first, It last, T&& initial, F&& func) {
898 return makeFuture(std::move(initial));
901 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
902 typedef typename std::conditional<
903 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
906 typedef isTry<Arg> IsTry;
908 auto sfunc = std::make_shared<F>(std::move(func));
910 auto f = first->then(
911 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
913 std::move(minitial), head.template get<IsTry::value, Arg&&>());
916 for (++first; first != last; ++first) {
917 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
918 return (*sfunc)(std::move(std::get<0>(t).value()),
919 // Either return a ItT&& or a Try<ItT>&& depending
920 // on the type of the argument of func.
921 std::get<1>(t).template get<IsTry::value, Arg&&>());
928 // window (collection)
930 template <class Collection, class F, class ItT, class Result>
931 std::vector<Future<Result>>
932 window(Collection input, F func, size_t n) {
933 struct WindowContext {
934 WindowContext(Collection&& i, F&& fn)
935 : input_(std::move(i)), promises_(input_.size()),
938 std::atomic<size_t> i_ {0};
940 std::vector<Promise<Result>> promises_;
943 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
944 size_t i = ctx->i_++;
945 if (i < ctx->input_.size()) {
946 // Using setCallback_ directly since we don't need the Future
947 ctx->func_(std::move(ctx->input_[i])).setCallback_(
948 // ctx is captured by value
949 [ctx, i](Try<Result>&& t) {
950 ctx->promises_[i].setTry(std::move(t));
951 // Chain another future onto this one
952 spawn(std::move(ctx));
958 auto max = std::min(n, input.size());
960 auto ctx = std::make_shared<WindowContext>(
961 std::move(input), std::move(func));
963 for (size_t i = 0; i < max; ++i) {
964 // Start the first n Futures
965 WindowContext::spawn(ctx);
968 std::vector<Future<Result>> futures;
969 futures.reserve(ctx->promises_.size());
970 for (auto& promise : ctx->promises_) {
971 futures.emplace_back(promise.getFuture());
980 template <class I, class F>
981 Future<I> Future<T>::reduce(I&& initial, F&& func) {
983 minitial = std::forward<I>(initial),
984 mfunc = std::forward<F>(func)
986 auto ret = std::move(minitial);
987 for (auto& val : vals) {
988 ret = mfunc(std::move(ret), std::move(val));
994 // unorderedReduce (iterator)
996 template <class It, class T, class F, class ItT, class Arg>
997 Future<T> unorderedReduce(It first, It last, T initial, F func) {
999 return makeFuture(std::move(initial));
1002 typedef isTry<Arg> IsTry;
1004 struct UnorderedReduceContext {
1005 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1006 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1007 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1009 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1012 size_t numThens_; // how many Futures completed and called .then()
1013 size_t numFutures_; // how many Futures in total
1014 Promise<T> promise_;
1017 auto ctx = std::make_shared<UnorderedReduceContext>(
1018 std::move(initial), std::move(func), std::distance(first, last));
1020 mapSetCallback<ItT>(
1023 [ctx](size_t /* i */, Try<ItT>&& t) {
1024 // Futures can be completed in any order, simultaneously.
1025 // To make this non-blocking, we create a new Future chain in
1026 // the order of completion to reduce the values.
1027 // The spinlock just protects chaining a new Future, not actually
1028 // executing the reduce, which should be really fast.
1029 folly::MSLGuard lock(ctx->lock_);
1031 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1032 // Either return a ItT&& or a Try<ItT>&& depending
1033 // on the type of the argument of func.
1034 return ctx->func_(std::move(v),
1035 mt.template get<IsTry::value, Arg&&>());
1037 if (++ctx->numThens_ == ctx->numFutures_) {
1038 // After reducing the value of the last Future, fulfill the Promise
1039 ctx->memo_.setCallback_(
1040 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1044 return ctx->promise_.getFuture();
1050 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1051 return within(dur, TimedOut(), tk);
1056 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1059 Context(E ex) : exception(std::move(ex)), promise() {}
1061 Future<Unit> thisFuture;
1063 std::atomic<bool> token {false};
1066 std::shared_ptr<Timekeeper> tks;
1068 tks = folly::detail::getTimekeeperSingleton();
1069 tk = DCHECK_NOTNULL(tks.get());
1072 auto ctx = std::make_shared<Context>(std::move(e));
1074 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1075 // TODO: "this" completed first, cancel "after"
1076 if (ctx->token.exchange(true) == false) {
1077 ctx->promise.setTry(std::move(t));
1081 tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
1082 // "after" completed first, cancel "this"
1083 ctx->thisFuture.raise(TimedOut());
1084 if (ctx->token.exchange(true) == false) {
1085 if (t.hasException()) {
1086 ctx->promise.setException(std::move(t.exception()));
1088 ctx->promise.setException(std::move(ctx->exception));
1093 return ctx->promise.getFuture().via(getExecutor());
1099 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1100 return collectAll(*this, futures::sleep(dur, tk))
1101 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1102 Try<T>& t = std::get<0>(tup);
1103 return makeFuture<T>(std::move(t));
1111 void waitImpl(Future<T>& f) {
1112 // short-circuit if there's nothing to do
1113 if (f.isReady()) return;
1115 FutureBatonType baton;
1116 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1118 assert(f.isReady());
1122 void waitImpl(Future<T>& f, Duration dur) {
1123 // short-circuit if there's nothing to do
1129 auto ret = promise.getFuture();
1130 auto baton = std::make_shared<FutureBatonType>();
1131 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1132 promise.setTry(std::move(t));
1136 if (baton->timed_wait(dur)) {
1137 assert(f.isReady());
1142 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1143 // Set callback so to ensure that the via executor has something on it
1144 // so that once the preceding future triggers this callback, drive will
1145 // always have a callback to satisfy it
1148 f = f.via(e).then([](T&& t) { return std::move(t); });
1149 while (!f.isReady()) {
1152 assert(f.isReady());
1155 } // namespace detail
1156 } // namespace futures
1159 Future<T>& Future<T>::wait() & {
1160 futures::detail::waitImpl(*this);
1165 Future<T>&& Future<T>::wait() && {
1166 futures::detail::waitImpl(*this);
1167 return std::move(*this);
1171 Future<T>& Future<T>::wait(Duration dur) & {
1172 futures::detail::waitImpl(*this, dur);
1177 Future<T>&& Future<T>::wait(Duration dur) && {
1178 futures::detail::waitImpl(*this, dur);
1179 return std::move(*this);
1183 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1184 futures::detail::waitViaImpl(*this, e);
1189 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1190 futures::detail::waitViaImpl(*this, e);
1191 return std::move(*this);
1195 T Future<T>::get() {
1196 return std::move(wait().value());
1200 T Future<T>::get(Duration dur) {
1203 return std::move(value());
1210 T Future<T>::getVia(DrivableExecutor* e) {
1211 return std::move(waitVia(e).value());
1218 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1219 return t1.value() == t2.value();
1222 } // namespace detail
1223 } // namespace futures
1226 Future<bool> Future<T>::willEqual(Future<T>& f) {
1227 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1228 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1229 return futures::detail::TryEquals<T>::equals(
1230 std::get<0>(t), std::get<1>(t));
1239 Future<T> Future<T>::filter(F&& predicate) {
1240 return this->then([p = std::forward<F>(predicate)](T val) {
1241 T const& valConstRef = val;
1242 if (!p(valConstRef)) {
1243 throw PredicateDoesNotObtain();
1250 inline Future<Unit> when(bool p, F&& thunk) {
1251 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1254 template <class P, class F>
1255 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1257 auto future = thunk();
1258 return future.then([
1259 predicate = std::forward<P>(predicate),
1260 thunk = std::forward<F>(thunk)
1262 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1265 return makeFuture();
1269 Future<Unit> times(const int n, F&& thunk) {
1270 return folly::whileDo(
1271 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1272 return count->fetch_add(1) < n;
1274 std::forward<F>(thunk));
1278 template <class It, class F, class ItT, class Result>
1279 std::vector<Future<Result>> map(It first, It last, F func) {
1280 std::vector<Future<Result>> results;
1281 for (auto it = first; it != last; it++) {
1282 results.push_back(it->then(func));
1292 struct retrying_policy_raw_tag {};
1293 struct retrying_policy_fut_tag {};
1295 template <class Policy>
1296 struct retrying_policy_traits {
1297 using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
1298 using is_raw = std::is_same<result, bool>;
1299 using is_fut = std::is_same<result, Future<bool>>;
1300 using tag = typename std::conditional<
1301 is_raw::value, retrying_policy_raw_tag, typename std::conditional<
1302 is_fut::value, retrying_policy_fut_tag, void>::type>::type;
1305 template <class Policy, class FF, class Prom>
1306 void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
1307 using F = typename std::result_of<FF(size_t)>::type;
1308 using T = typename F::value_type;
1309 auto f = makeFutureWith([&] { return ff(k++); });
1312 prom = std::move(prom),
1313 pm = std::forward<Policy>(p),
1314 ffm = std::forward<FF>(ff)
1315 ](Try<T> && t) mutable {
1317 prom.setValue(std::move(t).value());
1320 auto& x = t.exception();
1324 prom = std::move(prom),
1327 ffm = std::move(ffm)
1328 ](bool shouldRetry) mutable {
1330 retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
1332 prom.setException(std::move(xm));
1338 template <class Policy, class FF>
1339 typename std::result_of<FF(size_t)>::type
1340 retrying(size_t k, Policy&& p, FF&& ff) {
1341 using F = typename std::result_of<FF(size_t)>::type;
1342 using T = typename F::value_type;
1343 auto prom = Promise<T>();
1344 auto f = prom.getFuture();
1346 k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
1350 template <class Policy, class FF>
1351 typename std::result_of<FF(size_t)>::type
1352 retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
1353 auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
1354 return makeFuture<bool>(pm(k, x));
1356 return retrying(0, std::move(q), std::forward<FF>(ff));
1359 template <class Policy, class FF>
1360 typename std::result_of<FF(size_t)>::type
1361 retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
1362 return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
1365 // jittered exponential backoff, clamped to [backoff_min, backoff_max]
1366 template <class URNG>
1367 Duration retryingJitteredExponentialBackoffDur(
1369 Duration backoff_min,
1370 Duration backoff_max,
1371 double jitter_param,
1374 auto dist = std::normal_distribution<double>(0.0, jitter_param);
1375 auto jitter = std::exp(dist(rng));
1376 auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
1377 return std::max(backoff_min, std::min(backoff_max, backoff));
1380 template <class Policy, class URNG>
1381 std::function<Future<bool>(size_t, const exception_wrapper&)>
1382 retryingPolicyCappedJitteredExponentialBackoff(
1384 Duration backoff_min,
1385 Duration backoff_max,
1386 double jitter_param,
1390 pm = std::forward<Policy>(p),
1395 rngp = std::forward<URNG>(rng)
1396 ](size_t n, const exception_wrapper& ex) mutable {
1397 if (n == max_tries) {
1398 return makeFuture(false);
1400 return pm(n, ex).then(
1401 [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
1404 return makeFuture(false);
1406 auto backoff = detail::retryingJitteredExponentialBackoffDur(
1407 n, backoff_min, backoff_max, jitter_param, rngp);
1408 return futures::sleep(backoff).then([] { return true; });
1413 template <class Policy, class URNG>
1414 std::function<Future<bool>(size_t, const exception_wrapper&)>
1415 retryingPolicyCappedJitteredExponentialBackoff(
1417 Duration backoff_min,
1418 Duration backoff_max,
1419 double jitter_param,
1422 retrying_policy_raw_tag) {
1423 auto q = [pm = std::forward<Policy>(p)](
1424 size_t n, const exception_wrapper& e) {
1425 return makeFuture(pm(n, e));
1427 return retryingPolicyCappedJitteredExponentialBackoff(
1432 std::forward<URNG>(rng),
1436 template <class Policy, class URNG>
1437 std::function<Future<bool>(size_t, const exception_wrapper&)>
1438 retryingPolicyCappedJitteredExponentialBackoff(
1440 Duration backoff_min,
1441 Duration backoff_max,
1442 double jitter_param,
1445 retrying_policy_fut_tag) {
1446 return retryingPolicyCappedJitteredExponentialBackoff(
1451 std::forward<URNG>(rng),
1452 std::forward<Policy>(p));
1456 template <class Policy, class FF>
1457 typename std::result_of<FF(size_t)>::type
1458 retrying(Policy&& p, FF&& ff) {
1459 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1460 return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
1464 std::function<bool(size_t, const exception_wrapper&)>
1465 retryingPolicyBasic(
1467 return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
1470 template <class Policy, class URNG>
1471 std::function<Future<bool>(size_t, const exception_wrapper&)>
1472 retryingPolicyCappedJitteredExponentialBackoff(
1474 Duration backoff_min,
1475 Duration backoff_max,
1476 double jitter_param,
1479 using tag = typename detail::retrying_policy_traits<Policy>::tag;
1480 return detail::retryingPolicyCappedJitteredExponentialBackoff(
1485 std::forward<URNG>(rng),
1486 std::forward<Policy>(p),
1491 std::function<Future<bool>(size_t, const exception_wrapper&)>
1492 retryingPolicyCappedJitteredExponentialBackoff(
1494 Duration backoff_min,
1495 Duration backoff_max,
1496 double jitter_param) {
1497 auto p = [](size_t, const exception_wrapper&) { return true; };
1498 return retryingPolicyCappedJitteredExponentialBackoff(
1509 // Instantiate the most common Future types to save compile time
1510 extern template class Future<Unit>;
1511 extern template class Future<bool>;
1512 extern template class Future<int>;
1513 extern template class Future<int64_t>;
1514 extern template class Future<std::string>;
1515 extern template class Future<double>;
1517 } // namespace folly