2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/futures/Timekeeper.h>
27 #include <folly/futures/detail/Core.h>
29 #ifndef FOLLY_FUTURE_USING_FIBER
30 #if FOLLY_MOBILE || defined(__APPLE__)
31 #define FOLLY_FUTURE_USING_FIBER 0
33 #define FOLLY_FUTURE_USING_FIBER 1
34 #include <folly/fibers/Baton.h>
44 #if FOLLY_FUTURE_USING_FIBER
45 typedef folly::fibers::Baton FutureBatonType;
47 typedef folly::Baton<> FutureBatonType;
50 } // namespace futures
53 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
58 // Guarantees that the stored functor is destructed before the stored promise
59 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
60 template <typename T, typename F>
61 class CoreCallbackState {
63 template <typename FF>
64 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
65 noexcept(F(std::declval<FF>())))
66 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
67 assert(before_barrier());
70 CoreCallbackState(CoreCallbackState&& that) noexcept(
71 noexcept(F(std::declval<F>()))) {
72 if (that.before_barrier()) {
73 new (&func_) F(std::move(that.func_));
74 promise_ = that.stealPromise();
78 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
80 ~CoreCallbackState() {
81 if (before_barrier()) {
86 template <typename... Args>
87 auto invoke(Args&&... args) noexcept(
88 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
89 assert(before_barrier());
90 return std::move(func_)(std::forward<Args>(args)...);
93 template <typename... Args>
94 auto tryInvoke(Args&&... args) noexcept {
95 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
98 void setTry(Try<T>&& t) {
99 stealPromise().setTry(std::move(t));
102 void setException(exception_wrapper&& ew) {
103 stealPromise().setException(std::move(ew));
106 Promise<T> stealPromise() noexcept {
107 assert(before_barrier());
109 return std::move(promise_);
113 bool before_barrier() const noexcept {
114 return !promise_.isFulfilled();
120 Promise<T> promise_{Promise<T>::makeEmpty()};
123 template <typename T, typename F>
124 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
125 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
126 std::declval<Promise<T>&&>(),
127 std::declval<F&&>()))) {
128 return CoreCallbackState<T, _t<std::decay<F>>>(
129 std::move(p), std::forward<F>(f));
131 } // namespace detail
132 } // namespace futures
135 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
136 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
139 inline SemiFuture<Unit> makeSemiFuture() {
140 return makeSemiFuture(Unit{});
143 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
145 typename std::enable_if<
146 isSemiFuture<typename std::result_of<F()>::type>::value,
147 typename std::result_of<F()>::type>::type
148 makeSemiFutureWith(F&& func) {
150 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
152 return std::forward<F>(func)();
153 } catch (std::exception& e) {
154 return makeSemiFuture<InnerType>(
155 exception_wrapper(std::current_exception(), e));
157 return makeSemiFuture<InnerType>(
158 exception_wrapper(std::current_exception()));
162 // makeSemiFutureWith(T()) -> SemiFuture<T>
163 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
165 typename std::enable_if<
166 !(isSemiFuture<typename std::result_of<F()>::type>::value),
167 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
168 makeSemiFutureWith(F&& func) {
169 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
170 return makeSemiFuture<LiftedResult>(
171 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
175 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
176 return makeSemiFuture(Try<T>(e));
180 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
181 return makeSemiFuture(Try<T>(std::move(ew)));
184 template <class T, class E>
186 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
187 makeSemiFuture(E const& e) {
188 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
192 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
193 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
197 SemiFuture<T> SemiFuture<T>::makeEmpty() {
198 return SemiFuture<T>(futures::detail::EmptyConstruct{});
202 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept : core_(other.core_) {
203 other.core_ = nullptr;
207 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
208 std::swap(core_, other.core_);
213 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept : core_(other.core_) {
214 other.core_ = nullptr;
218 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
219 std::swap(core_, other.core_);
224 template <class T2, typename>
225 SemiFuture<T>::SemiFuture(T2&& val)
226 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
229 template <typename T2>
230 SemiFuture<T>::SemiFuture(
231 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
232 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
237 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
239 SemiFuture<T>::SemiFuture(in_place_t, Args&&... args)
241 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
245 SemiFuture<T>::~SemiFuture() {
250 typename std::add_lvalue_reference<T>::type SemiFuture<T>::value() {
253 return core_->getTry().value();
257 typename std::add_lvalue_reference<const T>::type SemiFuture<T>::value() const {
260 return core_->getTry().value();
264 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
267 setExecutor(executor, priority);
269 auto newFuture = Future<T>(core_);
275 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) & {
278 auto f = p.getFuture();
279 auto func = [p = std::move(p)](Try<T>&& t) mutable {
280 p.setTry(std::move(t));
282 using R = futures::detail::callableResult<T, decltype(func)>;
283 thenImplementation<decltype(func), R>(std::move(func), typename R::Arg());
284 return std::move(f).via(executor, priority);
288 bool SemiFuture<T>::isReady() const {
290 return core_->ready();
294 bool SemiFuture<T>::hasValue() {
295 return getTry().hasValue();
299 bool SemiFuture<T>::hasException() {
300 return getTry().hasException();
304 void SemiFuture<T>::detach() {
306 core_->detachFuture();
312 Try<T>& SemiFuture<T>::getTry() {
315 return core_->getTry();
319 void SemiFuture<T>::throwIfInvalid() const {
325 Optional<Try<T>> SemiFuture<T>::poll() {
327 if (core_->ready()) {
328 o = std::move(core_->getTry());
334 void SemiFuture<T>::raise(exception_wrapper exception) {
335 core_->raise(std::move(exception));
340 void SemiFuture<T>::setCallback_(F&& func) {
342 core_->setCallback(std::forward<F>(func));
346 SemiFuture<T>::SemiFuture(futures::detail::EmptyConstruct) noexcept
350 Future<T> Future<T>::makeEmpty() {
351 return Future<T>(futures::detail::EmptyConstruct{});
355 Future<T>::Future(Future<T>&& other) noexcept
356 : SemiFuture<T>(std::move(other)) {}
359 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
360 SemiFuture<T>::operator=(SemiFuture<T>{std::move(other)});
367 typename std::enable_if<
368 !std::is_same<T, typename std::decay<T2>::type>::value &&
369 std::is_constructible<T, T2&&>::value &&
370 std::is_convertible<T2&&, T>::value,
372 Future<T>::Future(Future<T2>&& other)
373 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
378 typename std::enable_if<
379 !std::is_same<T, typename std::decay<T2>::type>::value &&
380 std::is_constructible<T, T2&&>::value &&
381 !std::is_convertible<T2&&, T>::value,
383 Future<T>::Future(Future<T2>&& other)
384 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
389 typename std::enable_if<
390 !std::is_same<T, typename std::decay<T2>::type>::value &&
391 std::is_constructible<T, T2&&>::value,
393 Future<T>& Future<T>::operator=(Future<T2>&& other) {
395 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
398 // TODO: isSemiFuture
400 template <class T2, typename>
401 Future<T>::Future(T2&& val) : SemiFuture<T>(std::forward<T2>(val)) {}
404 template <typename T2>
405 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
411 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
413 Future<T>::Future(in_place_t, Args&&... args)
414 : SemiFuture<T>(in_place, std::forward<Args>(args)...) {}
417 Future<T>::~Future() {
424 typename std::enable_if<isFuture<F>::value,
425 Future<typename isFuture<T>::Inner>>::type
426 Future<T>::unwrap() {
427 return then([](Future<typename isFuture<T>::Inner> internal_future) {
428 return internal_future;
434 // Variant: returns a value
435 // e.g. f.then([](Try<T>&& t){ return t.value(); });
437 template <typename F, typename R, bool isTry, typename... Args>
438 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
439 SemiFuture<T>::thenImplementation(
441 futures::detail::argResult<isTry, F, Args...>) {
442 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
443 typedef typename R::ReturnsFuture::Inner B;
445 this->throwIfInvalid();
448 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
450 // grab the Future now before we lose our handle on the Promise
451 auto f = p.getFuture();
452 f.core_->setExecutorNoLock(this->getExecutor());
454 /* This is a bit tricky.
456 We can't just close over *this in case this Future gets moved. So we
457 make a new dummy Future. We could figure out something more
458 sophisticated that avoids making a new Future object when it can, as an
459 optimization. But this is correct.
461 core_ can't be moved, it is explicitly disallowed (as is copying). But
462 if there's ever a reason to allow it, this is one place that makes that
463 assumption and would need to be fixed. We use a standard shared pointer
464 for core_ (by copying it in), which means in essence obj holds a shared
465 pointer to itself. But this shouldn't leak because Promise will not
466 outlive the continuation, because Promise will setException() with a
467 broken Promise if it is destructed before completed. We could use a
468 weak pointer but it would have to be converted to a shared pointer when
469 func is executed (because the Future returned by func may possibly
470 persist beyond the callback, if it gets moved), and so it is an
471 optimization to just make it shared from the get-go.
473 Two subtle but important points about this design. futures::detail::Core
474 has no back pointers to Future or Promise, so if Future or Promise get
475 moved (and they will be moved in performant code) we don't have to do
476 anything fancy. And because we store the continuation in the
477 futures::detail::Core, not in the Future, we can execute the continuation
478 even after the Future has gone out of scope. This is an intentional design
479 decision. It is likely we will want to be able to cancel a continuation
480 in some circumstances, but I think it should be explicit not implicit
481 in the destruction of the Future used to create it.
484 [state = futures::detail::makeCoreCallbackState(
485 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
487 if (!isTry && t.hasException()) {
488 state.setException(std::move(t.exception()));
490 state.setTry(makeTryWith(
491 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
497 // Variant: returns a Future
498 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
500 template <typename F, typename R, bool isTry, typename... Args>
501 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
502 SemiFuture<T>::thenImplementation(
504 futures::detail::argResult<isTry, F, Args...>) {
505 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
506 typedef typename R::ReturnsFuture::Inner B;
507 this->throwIfInvalid();
510 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
512 // grab the Future now before we lose our handle on the Promise
513 auto f = p.getFuture();
514 f.core_->setExecutorNoLock(this->getExecutor());
517 [state = futures::detail::makeCoreCallbackState(
518 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
519 if (!isTry && t.hasException()) {
520 state.setException(std::move(t.exception()));
522 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
523 if (tf2.hasException()) {
524 state.setException(std::move(tf2.exception()));
526 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
527 p.setTry(std::move(b));
536 template <typename T>
537 template <typename R, typename Caller, typename... Args>
538 Future<typename isFuture<R>::Inner>
539 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
540 typedef typename std::remove_cv<typename std::remove_reference<
541 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
544 return then([instance, func](Try<T>&& t){
545 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
550 Future<Unit> Future<T>::then() {
551 return then([] () {});
554 // onError where the callback returns T
557 typename std::enable_if<
558 !futures::detail::callableWith<F, exception_wrapper>::value &&
559 !futures::detail::callableWith<F, exception_wrapper&>::value &&
560 !futures::detail::Extract<F>::ReturnsFuture::value,
562 Future<T>::onError(F&& func) {
563 typedef std::remove_reference_t<
564 typename futures::detail::Extract<F>::FirstArg>
567 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
568 "Return type of onError callback must be T or Future<T>");
571 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
572 auto f = p.getFuture();
575 [state = futures::detail::makeCoreCallbackState(
576 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
577 if (auto e = t.template tryGetExceptionObject<Exn>()) {
578 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
580 state.setTry(std::move(t));
587 // onError where the callback returns Future<T>
590 typename std::enable_if<
591 !futures::detail::callableWith<F, exception_wrapper>::value &&
592 !futures::detail::callableWith<F, exception_wrapper&>::value &&
593 futures::detail::Extract<F>::ReturnsFuture::value,
595 Future<T>::onError(F&& func) {
597 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
599 "Return type of onError callback must be T or Future<T>");
600 typedef std::remove_reference_t<
601 typename futures::detail::Extract<F>::FirstArg>
605 auto f = p.getFuture();
608 [state = futures::detail::makeCoreCallbackState(
609 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
610 if (auto e = t.template tryGetExceptionObject<Exn>()) {
611 auto tf2 = state.tryInvoke(*e);
612 if (tf2.hasException()) {
613 state.setException(std::move(tf2.exception()));
615 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
616 p.setTry(std::move(t3));
620 state.setTry(std::move(t));
629 Future<T> Future<T>::ensure(F&& func) {
630 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
632 return makeFuture(std::move(t));
638 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
639 return within(dur, tk).onError([funcw = std::forward<F>(func)](
640 TimedOut const&) { return std::move(funcw)(); });
645 typename std::enable_if<
646 futures::detail::callableWith<F, exception_wrapper>::value &&
647 futures::detail::Extract<F>::ReturnsFuture::value,
649 Future<T>::onError(F&& func) {
651 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
653 "Return type of onError callback must be T or Future<T>");
656 auto f = p.getFuture();
658 [state = futures::detail::makeCoreCallbackState(
659 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
660 if (t.hasException()) {
661 auto tf2 = state.tryInvoke(std::move(t.exception()));
662 if (tf2.hasException()) {
663 state.setException(std::move(tf2.exception()));
665 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
666 p.setTry(std::move(t3));
670 state.setTry(std::move(t));
677 // onError(exception_wrapper) that returns T
680 typename std::enable_if<
681 futures::detail::callableWith<F, exception_wrapper>::value &&
682 !futures::detail::Extract<F>::ReturnsFuture::value,
684 Future<T>::onError(F&& func) {
686 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
688 "Return type of onError callback must be T or Future<T>");
691 auto f = p.getFuture();
693 [state = futures::detail::makeCoreCallbackState(
694 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
695 if (t.hasException()) {
696 state.setTry(makeTryWith(
697 [&] { return state.invoke(std::move(t.exception())); }));
699 state.setTry(std::move(t));
707 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
708 return waitVia(e).getTry();
711 template <class Func>
712 auto via(Executor* x, Func&& func)
713 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
714 // TODO make this actually more performant. :-P #7260175
715 return via(x).then(std::forward<Func>(func));
719 Future<T>::Future(futures::detail::EmptyConstruct) noexcept
720 : SemiFuture<T>(futures::detail::EmptyConstruct{}) {}
725 Future<typename std::decay<T>::type> makeFuture(T&& t) {
726 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
729 inline Future<Unit> makeFuture() {
730 return makeFuture(Unit{});
733 // makeFutureWith(Future<T>()) -> Future<T>
735 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
736 typename std::result_of<F()>::type>::type
737 makeFutureWith(F&& func) {
739 typename isFuture<typename std::result_of<F()>::type>::Inner;
741 return std::forward<F>(func)();
742 } catch (std::exception& e) {
743 return makeFuture<InnerType>(
744 exception_wrapper(std::current_exception(), e));
746 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
750 // makeFutureWith(T()) -> Future<T>
751 // makeFutureWith(void()) -> Future<Unit>
753 typename std::enable_if<
754 !(isFuture<typename std::result_of<F()>::type>::value),
755 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
756 makeFutureWith(F&& func) {
757 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
758 return makeFuture<LiftedResult>(
759 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
763 Future<T> makeFuture(std::exception_ptr const& e) {
764 return makeFuture(Try<T>(e));
768 Future<T> makeFuture(exception_wrapper ew) {
769 return makeFuture(Try<T>(std::move(ew)));
772 template <class T, class E>
773 typename std::enable_if<std::is_base_of<std::exception, E>::value,
775 makeFuture(E const& e) {
776 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
780 Future<T> makeFuture(Try<T>&& t) {
781 return Future<T>(new futures::detail::Core<T>(std::move(t)));
785 Future<Unit> via(Executor* executor, int8_t priority) {
786 return makeFuture().via(executor, priority);
789 // mapSetCallback calls func(i, Try<T>) when every future completes
791 template <class T, class InputIterator, class F>
792 void mapSetCallback(InputIterator first, InputIterator last, F func) {
793 for (size_t i = 0; first != last; ++first, ++i) {
794 first->setCallback_([func, i](Try<T>&& t) {
795 func(i, std::move(t));
800 // collectAll (variadic)
802 template <typename... Fs>
803 typename futures::detail::CollectAllVariadicContext<
804 typename std::decay<Fs>::type::value_type...>::type
805 collectAll(Fs&&... fs) {
806 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
807 typename std::decay<Fs>::type::value_type...>>();
808 futures::detail::collectVariadicHelper<
809 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
810 return ctx->p.getFuture();
813 // collectAll (iterator)
815 template <class InputIterator>
818 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
819 collectAll(InputIterator first, InputIterator last) {
821 typename std::iterator_traits<InputIterator>::value_type::value_type T;
823 struct CollectAllContext {
824 CollectAllContext(size_t n) : results(n) {}
825 ~CollectAllContext() {
826 p.setValue(std::move(results));
828 Promise<std::vector<Try<T>>> p;
829 std::vector<Try<T>> results;
833 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
834 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
835 ctx->results[i] = std::move(t);
837 return ctx->p.getFuture();
840 // collect (iterator)
845 template <typename T>
846 struct CollectContext {
848 explicit Nothing(int /* n */) {}
851 using Result = typename std::conditional<
852 std::is_void<T>::value,
854 std::vector<T>>::type;
856 using InternalResult = typename std::conditional<
857 std::is_void<T>::value,
859 std::vector<Optional<T>>>::type;
861 explicit CollectContext(size_t n) : result(n) {}
863 if (!threw.exchange(true)) {
864 // map Optional<T> -> T
865 std::vector<T> finalResult;
866 finalResult.reserve(result.size());
867 std::transform(result.begin(), result.end(),
868 std::back_inserter(finalResult),
869 [](Optional<T>& o) { return std::move(o.value()); });
870 p.setValue(std::move(finalResult));
873 inline void setPartialResult(size_t i, Try<T>& t) {
874 result[i] = std::move(t.value());
877 InternalResult result;
878 std::atomic<bool> threw {false};
881 } // namespace detail
882 } // namespace futures
884 template <class InputIterator>
885 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
886 InputIterator>::value_type::value_type>::Result>
887 collect(InputIterator first, InputIterator last) {
889 typename std::iterator_traits<InputIterator>::value_type::value_type T;
891 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
892 std::distance(first, last));
893 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
894 if (t.hasException()) {
895 if (!ctx->threw.exchange(true)) {
896 ctx->p.setException(std::move(t.exception()));
898 } else if (!ctx->threw) {
899 ctx->setPartialResult(i, t);
902 return ctx->p.getFuture();
905 // collect (variadic)
907 template <typename... Fs>
908 typename futures::detail::CollectVariadicContext<
909 typename std::decay<Fs>::type::value_type...>::type
910 collect(Fs&&... fs) {
911 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
912 typename std::decay<Fs>::type::value_type...>>();
913 futures::detail::collectVariadicHelper<
914 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
915 return ctx->p.getFuture();
918 // collectAny (iterator)
920 template <class InputIterator>
925 std::iterator_traits<InputIterator>::value_type::value_type>>>
926 collectAny(InputIterator first, InputIterator last) {
928 typename std::iterator_traits<InputIterator>::value_type::value_type T;
930 struct CollectAnyContext {
931 CollectAnyContext() {}
932 Promise<std::pair<size_t, Try<T>>> p;
933 std::atomic<bool> done {false};
936 auto ctx = std::make_shared<CollectAnyContext>();
937 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
938 if (!ctx->done.exchange(true)) {
939 ctx->p.setValue(std::make_pair(i, std::move(t)));
942 return ctx->p.getFuture();
945 // collectAnyWithoutException (iterator)
947 template <class InputIterator>
950 typename std::iterator_traits<InputIterator>::value_type::value_type>>
951 collectAnyWithoutException(InputIterator first, InputIterator last) {
953 typename std::iterator_traits<InputIterator>::value_type::value_type T;
955 struct CollectAnyWithoutExceptionContext {
956 CollectAnyWithoutExceptionContext(){}
957 Promise<std::pair<size_t, T>> p;
958 std::atomic<bool> done{false};
959 std::atomic<size_t> nFulfilled{0};
963 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
964 ctx->nTotal = size_t(std::distance(first, last));
966 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
967 if (!t.hasException() && !ctx->done.exchange(true)) {
968 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
969 } else if (++ctx->nFulfilled == ctx->nTotal) {
970 ctx->p.setException(t.exception());
973 return ctx->p.getFuture();
976 // collectN (iterator)
978 template <class InputIterator>
979 Future<std::vector<std::pair<size_t, Try<typename
980 std::iterator_traits<InputIterator>::value_type::value_type>>>>
981 collectN(InputIterator first, InputIterator last, size_t n) {
983 std::iterator_traits<InputIterator>::value_type::value_type T;
984 typedef std::vector<std::pair<size_t, Try<T>>> V;
986 struct CollectNContext {
988 std::atomic<size_t> completed = {0};
991 auto ctx = std::make_shared<CollectNContext>();
993 if (size_t(std::distance(first, last)) < n) {
994 ctx->p.setException(std::runtime_error("Not enough futures"));
996 // for each completed Future, increase count and add to vector, until we
997 // have n completed futures at which point we fulfil our Promise with the
999 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1000 auto c = ++ctx->completed;
1002 assert(ctx->v.size() < n);
1003 ctx->v.emplace_back(i, std::move(t));
1005 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1011 return ctx->p.getFuture();
1014 // reduce (iterator)
1016 template <class It, class T, class F>
1017 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1018 if (first == last) {
1019 return makeFuture(std::move(initial));
1022 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1023 typedef typename std::conditional<
1024 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1027 typedef isTry<Arg> IsTry;
1029 auto sfunc = std::make_shared<F>(std::move(func));
1031 auto f = first->then(
1032 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1034 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1037 for (++first; first != last; ++first) {
1038 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1039 return (*sfunc)(std::move(std::get<0>(t).value()),
1040 // Either return a ItT&& or a Try<ItT>&& depending
1041 // on the type of the argument of func.
1042 std::get<1>(t).template get<IsTry::value, Arg&&>());
1049 // window (collection)
1051 template <class Collection, class F, class ItT, class Result>
1052 std::vector<Future<Result>>
1053 window(Collection input, F func, size_t n) {
1054 struct WindowContext {
1055 WindowContext(Collection&& i, F&& fn)
1056 : input_(std::move(i)), promises_(input_.size()),
1057 func_(std::move(fn))
1059 std::atomic<size_t> i_ {0};
1061 std::vector<Promise<Result>> promises_;
1064 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
1065 size_t i = ctx->i_++;
1066 if (i < ctx->input_.size()) {
1067 // Using setCallback_ directly since we don't need the Future
1068 ctx->func_(std::move(ctx->input_[i])).setCallback_(
1069 // ctx is captured by value
1070 [ctx, i](Try<Result>&& t) {
1071 ctx->promises_[i].setTry(std::move(t));
1072 // Chain another future onto this one
1073 spawn(std::move(ctx));
1079 auto max = std::min(n, input.size());
1081 auto ctx = std::make_shared<WindowContext>(
1082 std::move(input), std::move(func));
1084 for (size_t i = 0; i < max; ++i) {
1085 // Start the first n Futures
1086 WindowContext::spawn(ctx);
1089 std::vector<Future<Result>> futures;
1090 futures.reserve(ctx->promises_.size());
1091 for (auto& promise : ctx->promises_) {
1092 futures.emplace_back(promise.getFuture());
1101 template <class I, class F>
1102 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1104 minitial = std::forward<I>(initial),
1105 mfunc = std::forward<F>(func)
1106 ](T& vals) mutable {
1107 auto ret = std::move(minitial);
1108 for (auto& val : vals) {
1109 ret = mfunc(std::move(ret), std::move(val));
1115 // unorderedReduce (iterator)
1117 template <class It, class T, class F, class ItT, class Arg>
1118 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1119 if (first == last) {
1120 return makeFuture(std::move(initial));
1123 typedef isTry<Arg> IsTry;
1125 struct UnorderedReduceContext {
1126 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1127 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1128 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1130 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1133 size_t numThens_; // how many Futures completed and called .then()
1134 size_t numFutures_; // how many Futures in total
1135 Promise<T> promise_;
1138 auto ctx = std::make_shared<UnorderedReduceContext>(
1139 std::move(initial), std::move(func), std::distance(first, last));
1141 mapSetCallback<ItT>(
1144 [ctx](size_t /* i */, Try<ItT>&& t) {
1145 // Futures can be completed in any order, simultaneously.
1146 // To make this non-blocking, we create a new Future chain in
1147 // the order of completion to reduce the values.
1148 // The spinlock just protects chaining a new Future, not actually
1149 // executing the reduce, which should be really fast.
1150 folly::MSLGuard lock(ctx->lock_);
1152 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1153 // Either return a ItT&& or a Try<ItT>&& depending
1154 // on the type of the argument of func.
1155 return ctx->func_(std::move(v),
1156 mt.template get<IsTry::value, Arg&&>());
1158 if (++ctx->numThens_ == ctx->numFutures_) {
1159 // After reducing the value of the last Future, fulfill the Promise
1160 ctx->memo_.setCallback_(
1161 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1165 return ctx->promise_.getFuture();
1171 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1172 return within(dur, TimedOut(), tk);
1177 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1180 Context(E ex) : exception(std::move(ex)), promise() {}
1182 Future<Unit> thisFuture;
1184 std::atomic<bool> token {false};
1187 if (this->isReady()) {
1188 return std::move(*this);
1191 std::shared_ptr<Timekeeper> tks;
1193 tks = folly::detail::getTimekeeperSingleton();
1194 tk = DCHECK_NOTNULL(tks.get());
1197 auto ctx = std::make_shared<Context>(std::move(e));
1199 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1200 if (ctx->token.exchange(true) == false) {
1201 ctx->promise.setTry(std::move(t));
1205 // Have time keeper use a weak ptr to hold ctx,
1206 // so that ctx can be deallocated as soon as the future job finished.
1207 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1208 auto lockedCtx = weakCtx.lock();
1210 // ctx already released. "this" completed first, cancel "after"
1213 // "after" completed first, cancel "this"
1214 lockedCtx->thisFuture.raise(TimedOut());
1215 if (lockedCtx->token.exchange(true) == false) {
1216 if (t.hasException()) {
1217 lockedCtx->promise.setException(std::move(t.exception()));
1219 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1224 return ctx->promise.getFuture().via(this->getExecutor());
1230 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1231 return collectAll(*this, futures::sleep(dur, tk))
1232 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1233 Try<T>& t = std::get<0>(tup);
1234 return makeFuture<T>(std::move(t));
1241 template <class FutureType, typename T = typename FutureType::value_type>
1242 void waitImpl(FutureType& f) {
1243 // short-circuit if there's nothing to do
1244 if (f.isReady()) return;
1246 FutureBatonType baton;
1247 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1249 assert(f.isReady());
1252 template <class FutureType, typename T = typename FutureType::value_type>
1253 void waitImpl(FutureType& f, Duration dur) {
1254 // short-circuit if there's nothing to do
1260 auto ret = promise.getFuture();
1261 auto baton = std::make_shared<FutureBatonType>();
1262 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1263 promise.setTry(std::move(t));
1267 if (baton->timed_wait(dur)) {
1268 assert(f.isReady());
1273 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1274 // Set callback so to ensure that the via executor has something on it
1275 // so that once the preceding future triggers this callback, drive will
1276 // always have a callback to satisfy it
1279 f = f.via(e).then([](T&& t) { return std::move(t); });
1280 while (!f.isReady()) {
1283 assert(f.isReady());
1286 } // namespace detail
1287 } // namespace futures
1290 SemiFuture<T>& SemiFuture<T>::wait() & {
1291 futures::detail::waitImpl(*this);
1296 SemiFuture<T>&& SemiFuture<T>::wait() && {
1297 futures::detail::waitImpl(*this);
1298 return std::move(*this);
1302 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1303 futures::detail::waitImpl(*this, dur);
1308 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1309 futures::detail::waitImpl(*this, dur);
1310 return std::move(*this);
1314 T SemiFuture<T>::get() {
1315 return std::move(wait().value());
1319 T SemiFuture<T>::get(Duration dur) {
1321 if (this->isReady()) {
1322 return std::move(this->value());
1329 Future<T>& Future<T>::wait() & {
1330 futures::detail::waitImpl(*this);
1335 Future<T>&& Future<T>::wait() && {
1336 futures::detail::waitImpl(*this);
1337 return std::move(*this);
1341 Future<T>& Future<T>::wait(Duration dur) & {
1342 futures::detail::waitImpl(*this, dur);
1347 Future<T>&& Future<T>::wait(Duration dur) && {
1348 futures::detail::waitImpl(*this, dur);
1349 return std::move(*this);
1353 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1354 futures::detail::waitViaImpl(*this, e);
1359 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1360 futures::detail::waitViaImpl(*this, e);
1361 return std::move(*this);
1365 T Future<T>::getVia(DrivableExecutor* e) {
1366 return std::move(waitVia(e).value());
1373 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1374 return t1.value() == t2.value();
1377 } // namespace detail
1378 } // namespace futures
1381 Future<bool> Future<T>::willEqual(Future<T>& f) {
1382 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1383 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1384 return futures::detail::TryEquals<T>::equals(
1385 std::get<0>(t), std::get<1>(t));
1394 Future<T> Future<T>::filter(F&& predicate) {
1395 return this->then([p = std::forward<F>(predicate)](T val) {
1396 T const& valConstRef = val;
1397 if (!p(valConstRef)) {
1398 throwPredicateDoesNotObtain();
1405 inline Future<Unit> when(bool p, F&& thunk) {
1406 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1409 template <class P, class F>
1410 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1412 auto future = thunk();
1413 return future.then([
1414 predicate = std::forward<P>(predicate),
1415 thunk = std::forward<F>(thunk)
1417 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1420 return makeFuture();
1424 Future<Unit> times(const int n, F&& thunk) {
1425 return folly::whileDo(
1426 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1427 return count->fetch_add(1) < n;
1429 std::forward<F>(thunk));
1433 template <class It, class F, class ItT, class Result>
1434 std::vector<Future<Result>> map(It first, It last, F func) {
1435 std::vector<Future<Result>> results;
1436 for (auto it = first; it != last; it++) {
1437 results.push_back(it->then(func));
1443 // Instantiate the most common Future types to save compile time
1444 extern template class Future<Unit>;
1445 extern template class Future<bool>;
1446 extern template class Future<int>;
1447 extern template class Future<int64_t>;
1448 extern template class Future<std::string>;
1449 extern template class Future<double>;
1450 } // namespace folly