2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
24 #include <folly/Baton.h>
25 #include <folly/Optional.h>
26 #include <folly/futures/Timekeeper.h>
27 #include <folly/futures/detail/Core.h>
29 #ifndef FOLLY_FUTURE_USING_FIBER
30 #if FOLLY_MOBILE || defined(__APPLE__)
31 #define FOLLY_FUTURE_USING_FIBER 0
33 #define FOLLY_FUTURE_USING_FIBER 1
34 #include <folly/fibers/Baton.h>
44 #if FOLLY_FUTURE_USING_FIBER
45 typedef folly::fibers::Baton FutureBatonType;
47 typedef folly::Baton<> FutureBatonType;
50 } // namespace futures
53 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
58 // Guarantees that the stored functor is destructed before the stored promise
59 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
60 template <typename T, typename F>
61 class CoreCallbackState {
63 template <typename FF>
64 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
65 noexcept(F(std::declval<FF>())))
66 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
67 assert(before_barrier());
70 CoreCallbackState(CoreCallbackState&& that) noexcept(
71 noexcept(F(std::declval<F>()))) {
72 if (that.before_barrier()) {
73 new (&func_) F(std::move(that.func_));
74 promise_ = that.stealPromise();
78 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
80 ~CoreCallbackState() {
81 if (before_barrier()) {
86 template <typename... Args>
87 auto invoke(Args&&... args) noexcept(
88 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
89 assert(before_barrier());
90 return std::move(func_)(std::forward<Args>(args)...);
93 template <typename... Args>
94 auto tryInvoke(Args&&... args) noexcept {
95 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
98 void setTry(Try<T>&& t) {
99 stealPromise().setTry(std::move(t));
102 void setException(exception_wrapper&& ew) {
103 stealPromise().setException(std::move(ew));
106 Promise<T> stealPromise() noexcept {
107 assert(before_barrier());
109 return std::move(promise_);
113 bool before_barrier() const noexcept {
114 return !promise_.isFulfilled();
120 Promise<T> promise_{Promise<T>::makeEmpty()};
123 template <typename T, typename F>
124 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
125 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
126 std::declval<Promise<T>&&>(),
127 std::declval<F&&>()))) {
128 return CoreCallbackState<T, _t<std::decay<F>>>(
129 std::move(p), std::forward<F>(f));
131 } // namespace detail
132 } // namespace futures
135 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
136 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
139 inline SemiFuture<Unit> makeSemiFuture() {
140 return makeSemiFuture(Unit{});
143 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
145 typename std::enable_if<
146 isSemiFuture<typename std::result_of<F()>::type>::value,
147 typename std::result_of<F()>::type>::type
148 makeSemiFutureWith(F&& func) {
150 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
152 return std::forward<F>(func)();
153 } catch (std::exception& e) {
154 return makeSemiFuture<InnerType>(
155 exception_wrapper(std::current_exception(), e));
157 return makeSemiFuture<InnerType>(
158 exception_wrapper(std::current_exception()));
162 // makeSemiFutureWith(T()) -> SemiFuture<T>
163 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
165 typename std::enable_if<
166 !(isSemiFuture<typename std::result_of<F()>::type>::value),
167 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
168 makeSemiFutureWith(F&& func) {
169 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
170 return makeSemiFuture<LiftedResult>(
171 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
175 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
176 return makeSemiFuture(Try<T>(e));
180 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
181 return makeSemiFuture(Try<T>(std::move(ew)));
184 template <class T, class E>
186 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
187 makeSemiFuture(E const& e) {
188 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
192 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
193 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
197 SemiFuture<T> SemiFuture<T>::makeEmpty() {
198 return SemiFuture<T>(futures::detail::EmptyConstruct{});
202 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept : core_(other.core_) {
203 other.core_ = nullptr;
207 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
208 std::swap(core_, other.core_);
213 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept : core_(other.core_) {
214 other.core_ = nullptr;
218 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
219 std::swap(core_, other.core_);
224 template <class T2, typename>
225 SemiFuture<T>::SemiFuture(T2&& val)
226 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
229 template <typename T2>
230 SemiFuture<T>::SemiFuture(
231 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
232 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
237 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
239 SemiFuture<T>::SemiFuture(in_place_t, Args&&... args)
241 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
245 SemiFuture<T>::~SemiFuture() {
250 T& SemiFuture<T>::value() & {
253 return core_->getTry().value();
257 T const& SemiFuture<T>::value() const& {
260 return core_->getTry().value();
264 T&& SemiFuture<T>::value() && {
267 return std::move(core_->getTry().value());
271 T const&& SemiFuture<T>::value() const&& {
274 return std::move(core_->getTry().value());
278 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
281 setExecutor(executor, priority);
283 auto newFuture = Future<T>(core_);
289 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) & {
292 auto f = p.getFuture();
293 auto func = [p = std::move(p)](Try<T>&& t) mutable {
294 p.setTry(std::move(t));
296 using R = futures::detail::callableResult<T, decltype(func)>;
297 thenImplementation<decltype(func), R>(std::move(func), typename R::Arg());
298 return std::move(f).via(executor, priority);
302 bool SemiFuture<T>::isReady() const {
304 return core_->ready();
308 bool SemiFuture<T>::hasValue() {
309 return getTry().hasValue();
313 bool SemiFuture<T>::hasException() {
314 return getTry().hasException();
318 void SemiFuture<T>::detach() {
320 core_->detachFuture();
326 Try<T>& SemiFuture<T>::getTry() {
329 return core_->getTry();
333 void SemiFuture<T>::throwIfInvalid() const {
339 Optional<Try<T>> SemiFuture<T>::poll() {
341 if (core_->ready()) {
342 o = std::move(core_->getTry());
348 void SemiFuture<T>::raise(exception_wrapper exception) {
349 core_->raise(std::move(exception));
354 void SemiFuture<T>::setCallback_(F&& func) {
356 core_->setCallback(std::forward<F>(func));
360 SemiFuture<T>::SemiFuture(futures::detail::EmptyConstruct) noexcept
364 Future<T> Future<T>::makeEmpty() {
365 return Future<T>(futures::detail::EmptyConstruct{});
369 Future<T>::Future(Future<T>&& other) noexcept
370 : SemiFuture<T>(std::move(other)) {}
373 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
374 SemiFuture<T>::operator=(SemiFuture<T>{std::move(other)});
381 typename std::enable_if<
382 !std::is_same<T, typename std::decay<T2>::type>::value &&
383 std::is_constructible<T, T2&&>::value &&
384 std::is_convertible<T2&&, T>::value,
386 Future<T>::Future(Future<T2>&& other)
387 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
392 typename std::enable_if<
393 !std::is_same<T, typename std::decay<T2>::type>::value &&
394 std::is_constructible<T, T2&&>::value &&
395 !std::is_convertible<T2&&, T>::value,
397 Future<T>::Future(Future<T2>&& other)
398 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
403 typename std::enable_if<
404 !std::is_same<T, typename std::decay<T2>::type>::value &&
405 std::is_constructible<T, T2&&>::value,
407 Future<T>& Future<T>::operator=(Future<T2>&& other) {
409 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
412 // TODO: isSemiFuture
414 template <class T2, typename>
415 Future<T>::Future(T2&& val) : SemiFuture<T>(std::forward<T2>(val)) {}
418 template <typename T2>
419 Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
425 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
427 Future<T>::Future(in_place_t, Args&&... args)
428 : SemiFuture<T>(in_place, std::forward<Args>(args)...) {}
431 Future<T>::~Future() {
438 typename std::enable_if<isFuture<F>::value,
439 Future<typename isFuture<T>::Inner>>::type
440 Future<T>::unwrap() {
441 return then([](Future<typename isFuture<T>::Inner> internal_future) {
442 return internal_future;
448 // Variant: returns a value
449 // e.g. f.then([](Try<T>&& t){ return t.value(); });
451 template <typename F, typename R, bool isTry, typename... Args>
452 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
453 SemiFuture<T>::thenImplementation(
455 futures::detail::argResult<isTry, F, Args...>) {
456 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
457 typedef typename R::ReturnsFuture::Inner B;
459 this->throwIfInvalid();
462 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
464 // grab the Future now before we lose our handle on the Promise
465 auto f = p.getFuture();
466 f.core_->setExecutorNoLock(this->getExecutor());
468 /* This is a bit tricky.
470 We can't just close over *this in case this Future gets moved. So we
471 make a new dummy Future. We could figure out something more
472 sophisticated that avoids making a new Future object when it can, as an
473 optimization. But this is correct.
475 core_ can't be moved, it is explicitly disallowed (as is copying). But
476 if there's ever a reason to allow it, this is one place that makes that
477 assumption and would need to be fixed. We use a standard shared pointer
478 for core_ (by copying it in), which means in essence obj holds a shared
479 pointer to itself. But this shouldn't leak because Promise will not
480 outlive the continuation, because Promise will setException() with a
481 broken Promise if it is destructed before completed. We could use a
482 weak pointer but it would have to be converted to a shared pointer when
483 func is executed (because the Future returned by func may possibly
484 persist beyond the callback, if it gets moved), and so it is an
485 optimization to just make it shared from the get-go.
487 Two subtle but important points about this design. futures::detail::Core
488 has no back pointers to Future or Promise, so if Future or Promise get
489 moved (and they will be moved in performant code) we don't have to do
490 anything fancy. And because we store the continuation in the
491 futures::detail::Core, not in the Future, we can execute the continuation
492 even after the Future has gone out of scope. This is an intentional design
493 decision. It is likely we will want to be able to cancel a continuation
494 in some circumstances, but I think it should be explicit not implicit
495 in the destruction of the Future used to create it.
498 [state = futures::detail::makeCoreCallbackState(
499 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
501 if (!isTry && t.hasException()) {
502 state.setException(std::move(t.exception()));
504 state.setTry(makeTryWith(
505 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
511 // Variant: returns a Future
512 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
514 template <typename F, typename R, bool isTry, typename... Args>
515 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
516 SemiFuture<T>::thenImplementation(
518 futures::detail::argResult<isTry, F, Args...>) {
519 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
520 typedef typename R::ReturnsFuture::Inner B;
521 this->throwIfInvalid();
524 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
526 // grab the Future now before we lose our handle on the Promise
527 auto f = p.getFuture();
528 f.core_->setExecutorNoLock(this->getExecutor());
531 [state = futures::detail::makeCoreCallbackState(
532 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
533 if (!isTry && t.hasException()) {
534 state.setException(std::move(t.exception()));
536 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
537 if (tf2.hasException()) {
538 state.setException(std::move(tf2.exception()));
540 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
541 p.setTry(std::move(b));
550 template <typename T>
551 template <typename R, typename Caller, typename... Args>
552 Future<typename isFuture<R>::Inner>
553 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
554 typedef typename std::remove_cv<typename std::remove_reference<
555 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
558 return then([instance, func](Try<T>&& t){
559 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
564 Future<Unit> Future<T>::then() {
565 return then([] () {});
568 // onError where the callback returns T
571 typename std::enable_if<
572 !futures::detail::callableWith<F, exception_wrapper>::value &&
573 !futures::detail::callableWith<F, exception_wrapper&>::value &&
574 !futures::detail::Extract<F>::ReturnsFuture::value,
576 Future<T>::onError(F&& func) {
577 typedef std::remove_reference_t<
578 typename futures::detail::Extract<F>::FirstArg>
581 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
582 "Return type of onError callback must be T or Future<T>");
585 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
586 auto f = p.getFuture();
589 [state = futures::detail::makeCoreCallbackState(
590 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
591 if (auto e = t.template tryGetExceptionObject<Exn>()) {
592 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
594 state.setTry(std::move(t));
601 // onError where the callback returns Future<T>
604 typename std::enable_if<
605 !futures::detail::callableWith<F, exception_wrapper>::value &&
606 !futures::detail::callableWith<F, exception_wrapper&>::value &&
607 futures::detail::Extract<F>::ReturnsFuture::value,
609 Future<T>::onError(F&& func) {
611 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
613 "Return type of onError callback must be T or Future<T>");
614 typedef std::remove_reference_t<
615 typename futures::detail::Extract<F>::FirstArg>
619 auto f = p.getFuture();
622 [state = futures::detail::makeCoreCallbackState(
623 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
624 if (auto e = t.template tryGetExceptionObject<Exn>()) {
625 auto tf2 = state.tryInvoke(*e);
626 if (tf2.hasException()) {
627 state.setException(std::move(tf2.exception()));
629 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
630 p.setTry(std::move(t3));
634 state.setTry(std::move(t));
643 Future<T> Future<T>::ensure(F&& func) {
644 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
646 return makeFuture(std::move(t));
652 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
653 return within(dur, tk).onError([funcw = std::forward<F>(func)](
654 TimedOut const&) { return std::move(funcw)(); });
659 typename std::enable_if<
660 futures::detail::callableWith<F, exception_wrapper>::value &&
661 futures::detail::Extract<F>::ReturnsFuture::value,
663 Future<T>::onError(F&& func) {
665 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
667 "Return type of onError callback must be T or Future<T>");
670 auto f = p.getFuture();
672 [state = futures::detail::makeCoreCallbackState(
673 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
674 if (t.hasException()) {
675 auto tf2 = state.tryInvoke(std::move(t.exception()));
676 if (tf2.hasException()) {
677 state.setException(std::move(tf2.exception()));
679 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
680 p.setTry(std::move(t3));
684 state.setTry(std::move(t));
691 // onError(exception_wrapper) that returns T
694 typename std::enable_if<
695 futures::detail::callableWith<F, exception_wrapper>::value &&
696 !futures::detail::Extract<F>::ReturnsFuture::value,
698 Future<T>::onError(F&& func) {
700 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
702 "Return type of onError callback must be T or Future<T>");
705 auto f = p.getFuture();
707 [state = futures::detail::makeCoreCallbackState(
708 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
709 if (t.hasException()) {
710 state.setTry(makeTryWith(
711 [&] { return state.invoke(std::move(t.exception())); }));
713 state.setTry(std::move(t));
721 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
722 return waitVia(e).getTry();
725 template <class Func>
726 auto via(Executor* x, Func&& func)
727 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
728 // TODO make this actually more performant. :-P #7260175
729 return via(x).then(std::forward<Func>(func));
733 Future<T>::Future(futures::detail::EmptyConstruct) noexcept
734 : SemiFuture<T>(futures::detail::EmptyConstruct{}) {}
739 Future<typename std::decay<T>::type> makeFuture(T&& t) {
740 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
743 inline Future<Unit> makeFuture() {
744 return makeFuture(Unit{});
747 // makeFutureWith(Future<T>()) -> Future<T>
749 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
750 typename std::result_of<F()>::type>::type
751 makeFutureWith(F&& func) {
753 typename isFuture<typename std::result_of<F()>::type>::Inner;
755 return std::forward<F>(func)();
756 } catch (std::exception& e) {
757 return makeFuture<InnerType>(
758 exception_wrapper(std::current_exception(), e));
760 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
764 // makeFutureWith(T()) -> Future<T>
765 // makeFutureWith(void()) -> Future<Unit>
767 typename std::enable_if<
768 !(isFuture<typename std::result_of<F()>::type>::value),
769 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
770 makeFutureWith(F&& func) {
771 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
772 return makeFuture<LiftedResult>(
773 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
777 Future<T> makeFuture(std::exception_ptr const& e) {
778 return makeFuture(Try<T>(e));
782 Future<T> makeFuture(exception_wrapper ew) {
783 return makeFuture(Try<T>(std::move(ew)));
786 template <class T, class E>
787 typename std::enable_if<std::is_base_of<std::exception, E>::value,
789 makeFuture(E const& e) {
790 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
794 Future<T> makeFuture(Try<T>&& t) {
795 return Future<T>(new futures::detail::Core<T>(std::move(t)));
799 Future<Unit> via(Executor* executor, int8_t priority) {
800 return makeFuture().via(executor, priority);
803 // mapSetCallback calls func(i, Try<T>) when every future completes
805 template <class T, class InputIterator, class F>
806 void mapSetCallback(InputIterator first, InputIterator last, F func) {
807 for (size_t i = 0; first != last; ++first, ++i) {
808 first->setCallback_([func, i](Try<T>&& t) {
809 func(i, std::move(t));
814 // collectAll (variadic)
816 template <typename... Fs>
817 typename futures::detail::CollectAllVariadicContext<
818 typename std::decay<Fs>::type::value_type...>::type
819 collectAll(Fs&&... fs) {
820 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
821 typename std::decay<Fs>::type::value_type...>>();
822 futures::detail::collectVariadicHelper<
823 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
824 return ctx->p.getFuture();
827 // collectAll (iterator)
829 template <class InputIterator>
832 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
833 collectAll(InputIterator first, InputIterator last) {
835 typename std::iterator_traits<InputIterator>::value_type::value_type T;
837 struct CollectAllContext {
838 CollectAllContext(size_t n) : results(n) {}
839 ~CollectAllContext() {
840 p.setValue(std::move(results));
842 Promise<std::vector<Try<T>>> p;
843 std::vector<Try<T>> results;
847 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
848 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
849 ctx->results[i] = std::move(t);
851 return ctx->p.getFuture();
854 // collect (iterator)
859 template <typename T>
860 struct CollectContext {
862 explicit Nothing(int /* n */) {}
865 using Result = typename std::conditional<
866 std::is_void<T>::value,
868 std::vector<T>>::type;
870 using InternalResult = typename std::conditional<
871 std::is_void<T>::value,
873 std::vector<Optional<T>>>::type;
875 explicit CollectContext(size_t n) : result(n) {}
877 if (!threw.exchange(true)) {
878 // map Optional<T> -> T
879 std::vector<T> finalResult;
880 finalResult.reserve(result.size());
881 std::transform(result.begin(), result.end(),
882 std::back_inserter(finalResult),
883 [](Optional<T>& o) { return std::move(o.value()); });
884 p.setValue(std::move(finalResult));
887 inline void setPartialResult(size_t i, Try<T>& t) {
888 result[i] = std::move(t.value());
891 InternalResult result;
892 std::atomic<bool> threw {false};
895 } // namespace detail
896 } // namespace futures
898 template <class InputIterator>
899 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
900 InputIterator>::value_type::value_type>::Result>
901 collect(InputIterator first, InputIterator last) {
903 typename std::iterator_traits<InputIterator>::value_type::value_type T;
905 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
906 std::distance(first, last));
907 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
908 if (t.hasException()) {
909 if (!ctx->threw.exchange(true)) {
910 ctx->p.setException(std::move(t.exception()));
912 } else if (!ctx->threw) {
913 ctx->setPartialResult(i, t);
916 return ctx->p.getFuture();
919 // collect (variadic)
921 template <typename... Fs>
922 typename futures::detail::CollectVariadicContext<
923 typename std::decay<Fs>::type::value_type...>::type
924 collect(Fs&&... fs) {
925 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
926 typename std::decay<Fs>::type::value_type...>>();
927 futures::detail::collectVariadicHelper<
928 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
929 return ctx->p.getFuture();
932 // collectAny (iterator)
934 template <class InputIterator>
939 std::iterator_traits<InputIterator>::value_type::value_type>>>
940 collectAny(InputIterator first, InputIterator last) {
942 typename std::iterator_traits<InputIterator>::value_type::value_type T;
944 struct CollectAnyContext {
945 CollectAnyContext() {}
946 Promise<std::pair<size_t, Try<T>>> p;
947 std::atomic<bool> done {false};
950 auto ctx = std::make_shared<CollectAnyContext>();
951 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
952 if (!ctx->done.exchange(true)) {
953 ctx->p.setValue(std::make_pair(i, std::move(t)));
956 return ctx->p.getFuture();
959 // collectAnyWithoutException (iterator)
961 template <class InputIterator>
964 typename std::iterator_traits<InputIterator>::value_type::value_type>>
965 collectAnyWithoutException(InputIterator first, InputIterator last) {
967 typename std::iterator_traits<InputIterator>::value_type::value_type T;
969 struct CollectAnyWithoutExceptionContext {
970 CollectAnyWithoutExceptionContext(){}
971 Promise<std::pair<size_t, T>> p;
972 std::atomic<bool> done{false};
973 std::atomic<size_t> nFulfilled{0};
977 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
978 ctx->nTotal = size_t(std::distance(first, last));
980 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
981 if (!t.hasException() && !ctx->done.exchange(true)) {
982 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
983 } else if (++ctx->nFulfilled == ctx->nTotal) {
984 ctx->p.setException(t.exception());
987 return ctx->p.getFuture();
990 // collectN (iterator)
992 template <class InputIterator>
993 Future<std::vector<std::pair<size_t, Try<typename
994 std::iterator_traits<InputIterator>::value_type::value_type>>>>
995 collectN(InputIterator first, InputIterator last, size_t n) {
997 std::iterator_traits<InputIterator>::value_type::value_type T;
998 typedef std::vector<std::pair<size_t, Try<T>>> V;
1000 struct CollectNContext {
1002 std::atomic<size_t> completed = {0};
1005 auto ctx = std::make_shared<CollectNContext>();
1007 if (size_t(std::distance(first, last)) < n) {
1008 ctx->p.setException(std::runtime_error("Not enough futures"));
1010 // for each completed Future, increase count and add to vector, until we
1011 // have n completed futures at which point we fulfil our Promise with the
1013 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1014 auto c = ++ctx->completed;
1016 assert(ctx->v.size() < n);
1017 ctx->v.emplace_back(i, std::move(t));
1019 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1025 return ctx->p.getFuture();
1028 // reduce (iterator)
1030 template <class It, class T, class F>
1031 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1032 if (first == last) {
1033 return makeFuture(std::move(initial));
1036 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1037 typedef typename std::conditional<
1038 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1041 typedef isTry<Arg> IsTry;
1043 auto sfunc = std::make_shared<F>(std::move(func));
1045 auto f = first->then(
1046 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1048 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1051 for (++first; first != last; ++first) {
1052 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1053 return (*sfunc)(std::move(std::get<0>(t).value()),
1054 // Either return a ItT&& or a Try<ItT>&& depending
1055 // on the type of the argument of func.
1056 std::get<1>(t).template get<IsTry::value, Arg&&>());
1063 // window (collection)
1065 template <class Collection, class F, class ItT, class Result>
1066 std::vector<Future<Result>>
1067 window(Collection input, F func, size_t n) {
1068 struct WindowContext {
1069 WindowContext(Collection&& i, F&& fn)
1070 : input_(std::move(i)), promises_(input_.size()),
1071 func_(std::move(fn))
1073 std::atomic<size_t> i_ {0};
1075 std::vector<Promise<Result>> promises_;
1078 static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
1079 size_t i = ctx->i_++;
1080 if (i < ctx->input_.size()) {
1081 // Using setCallback_ directly since we don't need the Future
1082 ctx->func_(std::move(ctx->input_[i])).setCallback_(
1083 // ctx is captured by value
1084 [ctx, i](Try<Result>&& t) {
1085 ctx->promises_[i].setTry(std::move(t));
1086 // Chain another future onto this one
1087 spawn(std::move(ctx));
1093 auto max = std::min(n, input.size());
1095 auto ctx = std::make_shared<WindowContext>(
1096 std::move(input), std::move(func));
1098 for (size_t i = 0; i < max; ++i) {
1099 // Start the first n Futures
1100 WindowContext::spawn(ctx);
1103 std::vector<Future<Result>> futures;
1104 futures.reserve(ctx->promises_.size());
1105 for (auto& promise : ctx->promises_) {
1106 futures.emplace_back(promise.getFuture());
1115 template <class I, class F>
1116 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1118 minitial = std::forward<I>(initial),
1119 mfunc = std::forward<F>(func)
1120 ](T& vals) mutable {
1121 auto ret = std::move(minitial);
1122 for (auto& val : vals) {
1123 ret = mfunc(std::move(ret), std::move(val));
1129 // unorderedReduce (iterator)
1131 template <class It, class T, class F, class ItT, class Arg>
1132 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1133 if (first == last) {
1134 return makeFuture(std::move(initial));
1137 typedef isTry<Arg> IsTry;
1139 struct UnorderedReduceContext {
1140 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1141 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1142 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1144 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1147 size_t numThens_; // how many Futures completed and called .then()
1148 size_t numFutures_; // how many Futures in total
1149 Promise<T> promise_;
1152 auto ctx = std::make_shared<UnorderedReduceContext>(
1153 std::move(initial), std::move(func), std::distance(first, last));
1155 mapSetCallback<ItT>(
1158 [ctx](size_t /* i */, Try<ItT>&& t) {
1159 // Futures can be completed in any order, simultaneously.
1160 // To make this non-blocking, we create a new Future chain in
1161 // the order of completion to reduce the values.
1162 // The spinlock just protects chaining a new Future, not actually
1163 // executing the reduce, which should be really fast.
1164 folly::MSLGuard lock(ctx->lock_);
1166 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1167 // Either return a ItT&& or a Try<ItT>&& depending
1168 // on the type of the argument of func.
1169 return ctx->func_(std::move(v),
1170 mt.template get<IsTry::value, Arg&&>());
1172 if (++ctx->numThens_ == ctx->numFutures_) {
1173 // After reducing the value of the last Future, fulfill the Promise
1174 ctx->memo_.setCallback_(
1175 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1179 return ctx->promise_.getFuture();
1185 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1186 return within(dur, TimedOut(), tk);
1191 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1194 Context(E ex) : exception(std::move(ex)), promise() {}
1196 Future<Unit> thisFuture;
1198 std::atomic<bool> token {false};
1201 if (this->isReady()) {
1202 return std::move(*this);
1205 std::shared_ptr<Timekeeper> tks;
1207 tks = folly::detail::getTimekeeperSingleton();
1208 tk = DCHECK_NOTNULL(tks.get());
1211 auto ctx = std::make_shared<Context>(std::move(e));
1213 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1214 if (ctx->token.exchange(true) == false) {
1215 ctx->promise.setTry(std::move(t));
1219 // Have time keeper use a weak ptr to hold ctx,
1220 // so that ctx can be deallocated as soon as the future job finished.
1221 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1222 auto lockedCtx = weakCtx.lock();
1224 // ctx already released. "this" completed first, cancel "after"
1227 // "after" completed first, cancel "this"
1228 lockedCtx->thisFuture.raise(TimedOut());
1229 if (lockedCtx->token.exchange(true) == false) {
1230 if (t.hasException()) {
1231 lockedCtx->promise.setException(std::move(t.exception()));
1233 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1238 return ctx->promise.getFuture().via(this->getExecutor());
1244 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1245 return collectAll(*this, futures::sleep(dur, tk))
1246 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1247 Try<T>& t = std::get<0>(tup);
1248 return makeFuture<T>(std::move(t));
1255 template <class FutureType, typename T = typename FutureType::value_type>
1256 void waitImpl(FutureType& f) {
1257 // short-circuit if there's nothing to do
1258 if (f.isReady()) return;
1260 FutureBatonType baton;
1261 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1263 assert(f.isReady());
1266 template <class FutureType, typename T = typename FutureType::value_type>
1267 void waitImpl(FutureType& f, Duration dur) {
1268 // short-circuit if there's nothing to do
1274 auto ret = promise.getFuture();
1275 auto baton = std::make_shared<FutureBatonType>();
1276 f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
1277 promise.setTry(std::move(t));
1281 if (baton->timed_wait(dur)) {
1282 assert(f.isReady());
1287 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1288 // Set callback so to ensure that the via executor has something on it
1289 // so that once the preceding future triggers this callback, drive will
1290 // always have a callback to satisfy it
1293 f = f.via(e).then([](T&& t) { return std::move(t); });
1294 while (!f.isReady()) {
1297 assert(f.isReady());
1300 } // namespace detail
1301 } // namespace futures
1304 SemiFuture<T>& SemiFuture<T>::wait() & {
1305 futures::detail::waitImpl(*this);
1310 SemiFuture<T>&& SemiFuture<T>::wait() && {
1311 futures::detail::waitImpl(*this);
1312 return std::move(*this);
1316 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1317 futures::detail::waitImpl(*this, dur);
1322 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1323 futures::detail::waitImpl(*this, dur);
1324 return std::move(*this);
1328 T SemiFuture<T>::get() {
1329 return std::move(wait().value());
1333 T SemiFuture<T>::get(Duration dur) {
1335 if (this->isReady()) {
1336 return std::move(this->value());
1343 Future<T>& Future<T>::wait() & {
1344 futures::detail::waitImpl(*this);
1349 Future<T>&& Future<T>::wait() && {
1350 futures::detail::waitImpl(*this);
1351 return std::move(*this);
1355 Future<T>& Future<T>::wait(Duration dur) & {
1356 futures::detail::waitImpl(*this, dur);
1361 Future<T>&& Future<T>::wait(Duration dur) && {
1362 futures::detail::waitImpl(*this, dur);
1363 return std::move(*this);
1367 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1368 futures::detail::waitViaImpl(*this, e);
1373 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1374 futures::detail::waitViaImpl(*this, e);
1375 return std::move(*this);
1379 T Future<T>::getVia(DrivableExecutor* e) {
1380 return std::move(waitVia(e).value());
1387 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1388 return t1.value() == t2.value();
1391 } // namespace detail
1392 } // namespace futures
1395 Future<bool> Future<T>::willEqual(Future<T>& f) {
1396 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1397 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1398 return futures::detail::TryEquals<T>::equals(
1399 std::get<0>(t), std::get<1>(t));
1408 Future<T> Future<T>::filter(F&& predicate) {
1409 return this->then([p = std::forward<F>(predicate)](T val) {
1410 T const& valConstRef = val;
1411 if (!p(valConstRef)) {
1412 throwPredicateDoesNotObtain();
1419 inline Future<Unit> when(bool p, F&& thunk) {
1420 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1423 template <class P, class F>
1424 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1426 auto future = thunk();
1427 return future.then([
1428 predicate = std::forward<P>(predicate),
1429 thunk = std::forward<F>(thunk)
1431 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1434 return makeFuture();
1438 Future<Unit> times(const int n, F&& thunk) {
1439 return folly::whileDo(
1440 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1441 return count->fetch_add(1) < n;
1443 std::forward<F>(thunk));
1447 template <class It, class F, class ItT, class Result>
1448 std::vector<Future<Result>> map(It first, It last, F func) {
1449 std::vector<Future<Result>> results;
1450 for (auto it = first; it != last; it++) {
1451 results.push_back(it->then(func));
1457 // Instantiate the most common Future types to save compile time
1458 extern template class Future<Unit>;
1459 extern template class Future<bool>;
1460 extern template class Future<int>;
1461 extern template class Future<int64_t>;
1462 extern template class Future<std::string>;
1463 extern template class Future<double>;
1464 } // namespace folly