2 * Copyright 2014-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 #include <folly/Optional.h>
24 #include <folly/executors/InlineExecutor.h>
25 #include <folly/futures/Timekeeper.h>
26 #include <folly/futures/detail/Core.h>
27 #include <folly/synchronization/Baton.h>
29 #ifndef FOLLY_FUTURE_USING_FIBER
30 #if FOLLY_MOBILE || defined(__APPLE__)
31 #define FOLLY_FUTURE_USING_FIBER 0
33 #define FOLLY_FUTURE_USING_FIBER 1
34 #include <folly/fibers/Baton.h>
44 #if FOLLY_FUTURE_USING_FIBER
45 typedef folly::fibers::Baton FutureBatonType;
47 typedef folly::Baton<> FutureBatonType;
50 } // namespace futures
53 std::shared_ptr<Timekeeper> getTimekeeperSingleton();
58 // Guarantees that the stored functor is destructed before the stored promise
59 // may be fulfilled. Assumes the stored functor to be noexcept-destructible.
60 template <typename T, typename F>
61 class CoreCallbackState {
63 template <typename FF>
64 CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
65 noexcept(F(std::declval<FF>())))
66 : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
67 assert(before_barrier());
70 CoreCallbackState(CoreCallbackState&& that) noexcept(
71 noexcept(F(std::declval<F>()))) {
72 if (that.before_barrier()) {
73 new (&func_) F(std::move(that.func_));
74 promise_ = that.stealPromise();
78 CoreCallbackState& operator=(CoreCallbackState&&) = delete;
80 ~CoreCallbackState() {
81 if (before_barrier()) {
86 template <typename... Args>
87 auto invoke(Args&&... args) noexcept(
88 noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
89 assert(before_barrier());
90 return std::move(func_)(std::forward<Args>(args)...);
93 template <typename... Args>
94 auto tryInvoke(Args&&... args) noexcept {
95 return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
98 void setTry(Try<T>&& t) {
99 stealPromise().setTry(std::move(t));
102 void setException(exception_wrapper&& ew) {
103 stealPromise().setException(std::move(ew));
106 Promise<T> stealPromise() noexcept {
107 assert(before_barrier());
109 return std::move(promise_);
113 bool before_barrier() const noexcept {
114 return !promise_.isFulfilled();
120 Promise<T> promise_{Promise<T>::makeEmpty()};
123 template <typename T, typename F>
124 inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
125 noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
126 std::declval<Promise<T>&&>(),
127 std::declval<F&&>()))) {
128 return CoreCallbackState<T, _t<std::decay<F>>>(
129 std::move(p), std::forward<F>(f));
133 FutureBase<T>::FutureBase(SemiFuture<T>&& other) noexcept : core_(other.core_) {
134 other.core_ = nullptr;
138 FutureBase<T>::FutureBase(Future<T>&& other) noexcept : core_(other.core_) {
139 other.core_ = nullptr;
143 template <class T2, typename>
144 FutureBase<T>::FutureBase(T2&& val)
145 : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
148 template <typename T2>
149 FutureBase<T>::FutureBase(
150 typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
151 : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
156 typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
158 FutureBase<T>::FutureBase(in_place_t, Args&&... args)
160 new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
164 template <class FutureType>
165 void FutureBase<T>::assign(FutureType& other) noexcept {
166 std::swap(core_, other.core_);
170 FutureBase<T>::~FutureBase() {
175 T& FutureBase<T>::value() & {
176 return result().value();
180 T const& FutureBase<T>::value() const& {
181 return result().value();
185 T&& FutureBase<T>::value() && {
186 return std::move(result().value());
190 T const&& FutureBase<T>::value() const&& {
191 return std::move(result().value());
195 Try<T>& FutureBase<T>::result() & {
198 return core_->getTry();
202 Try<T> const& FutureBase<T>::result() const& {
205 return core_->getTry();
209 Try<T>&& FutureBase<T>::result() && {
212 return std::move(core_->getTry());
216 Try<T> const&& FutureBase<T>::result() const&& {
219 return std::move(core_->getTry());
223 bool FutureBase<T>::isReady() const {
225 return core_->ready();
229 bool FutureBase<T>::hasValue() {
230 return core_->getTry().hasValue();
234 bool FutureBase<T>::hasException() {
235 return core_->getTry().hasException();
239 void FutureBase<T>::detach() {
241 core_->detachFuture();
247 void FutureBase<T>::throwIfInvalid() const {
254 Optional<Try<T>> FutureBase<T>::poll() {
256 if (core_->ready()) {
257 o = std::move(core_->getTry());
263 void FutureBase<T>::raise(exception_wrapper exception) {
264 core_->raise(std::move(exception));
269 void FutureBase<T>::setCallback_(F&& func) {
271 core_->setCallback(std::forward<F>(func));
275 FutureBase<T>::FutureBase(futures::detail::EmptyConstruct) noexcept
280 // Variant: returns a value
281 // e.g. f.then([](Try<T>&& t){ return t.value(); });
283 template <typename F, typename R, bool isTry, typename... Args>
284 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
285 FutureBase<T>::thenImplementation(
287 futures::detail::argResult<isTry, F, Args...>) {
288 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
289 typedef typename R::ReturnsFuture::Inner B;
291 this->throwIfInvalid();
294 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
296 // grab the Future now before we lose our handle on the Promise
297 auto f = p.getFuture();
298 f.core_->setExecutorNoLock(this->getExecutor());
300 /* This is a bit tricky.
302 We can't just close over *this in case this Future gets moved. So we
303 make a new dummy Future. We could figure out something more
304 sophisticated that avoids making a new Future object when it can, as an
305 optimization. But this is correct.
307 core_ can't be moved, it is explicitly disallowed (as is copying). But
308 if there's ever a reason to allow it, this is one place that makes that
309 assumption and would need to be fixed. We use a standard shared pointer
310 for core_ (by copying it in), which means in essence obj holds a shared
311 pointer to itself. But this shouldn't leak because Promise will not
312 outlive the continuation, because Promise will setException() with a
313 broken Promise if it is destructed before completed. We could use a
314 weak pointer but it would have to be converted to a shared pointer when
315 func is executed (because the Future returned by func may possibly
316 persist beyond the callback, if it gets moved), and so it is an
317 optimization to just make it shared from the get-go.
319 Two subtle but important points about this design. futures::detail::Core
320 has no back pointers to Future or Promise, so if Future or Promise get
321 moved (and they will be moved in performant code) we don't have to do
322 anything fancy. And because we store the continuation in the
323 futures::detail::Core, not in the Future, we can execute the continuation
324 even after the Future has gone out of scope. This is an intentional design
325 decision. It is likely we will want to be able to cancel a continuation
326 in some circumstances, but I think it should be explicit not implicit
327 in the destruction of the Future used to create it.
330 [state = futures::detail::makeCoreCallbackState(
331 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
333 if (!isTry && t.hasException()) {
334 state.setException(std::move(t.exception()));
336 state.setTry(makeTryWith(
337 [&] { return state.invoke(t.template get<isTry, Args>()...); }));
343 // Variant: returns a Future
344 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
346 template <typename F, typename R, bool isTry, typename... Args>
347 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
348 FutureBase<T>::thenImplementation(
350 futures::detail::argResult<isTry, F, Args...>) {
351 static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
352 typedef typename R::ReturnsFuture::Inner B;
353 this->throwIfInvalid();
356 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
358 // grab the Future now before we lose our handle on the Promise
359 auto f = p.getFuture();
360 f.core_->setExecutorNoLock(this->getExecutor());
363 [state = futures::detail::makeCoreCallbackState(
364 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
365 if (!isTry && t.hasException()) {
366 state.setException(std::move(t.exception()));
368 auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
369 if (tf2.hasException()) {
370 state.setException(std::move(tf2.exception()));
372 tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
373 p.setTry(std::move(b));
381 } // namespace detail
382 } // namespace futures
385 SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
386 return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
389 // makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
391 typename std::enable_if<
392 isSemiFuture<typename std::result_of<F()>::type>::value,
393 typename std::result_of<F()>::type>::type
394 makeSemiFutureWith(F&& func) {
396 typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
398 return std::forward<F>(func)();
399 } catch (std::exception& e) {
400 return makeSemiFuture<InnerType>(
401 exception_wrapper(std::current_exception(), e));
403 return makeSemiFuture<InnerType>(
404 exception_wrapper(std::current_exception()));
408 // makeSemiFutureWith(T()) -> SemiFuture<T>
409 // makeSemiFutureWith(void()) -> SemiFuture<Unit>
411 typename std::enable_if<
412 !(isSemiFuture<typename std::result_of<F()>::type>::value),
413 SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
414 makeSemiFutureWith(F&& func) {
415 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
416 return makeSemiFuture<LiftedResult>(
417 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
421 SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
422 return makeSemiFuture(Try<T>(e));
426 SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
427 return makeSemiFuture(Try<T>(std::move(ew)));
430 template <class T, class E>
432 enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
433 makeSemiFuture(E const& e) {
434 return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
438 SemiFuture<T> makeSemiFuture(Try<T>&& t) {
439 return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
442 // This must be defined after the constructors to avoid a bug in MSVC
443 // https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
444 inline SemiFuture<Unit> makeSemiFuture() {
445 return makeSemiFuture(Unit{});
449 SemiFuture<T> SemiFuture<T>::makeEmpty() {
450 return SemiFuture<T>(futures::detail::EmptyConstruct{});
454 SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept
455 : futures::detail::FutureBase<T>(std::move(other)) {}
458 SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept
459 : futures::detail::FutureBase<T>(std::move(other)) {
460 // SemiFuture should not have an executor on construction
462 this->setExecutor(nullptr);
467 SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
473 SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
475 // SemiFuture should not have an executor on construction
477 this->setExecutor(nullptr);
483 void SemiFuture<T>::boost_() {
484 // If a SemiFuture has an executor it should be deferred, so boost it
485 if (auto e = this->getExecutor()) {
486 // We know in a SemiFuture that if we have an executor it should be
487 // DeferredExecutor. Verify this in debug mode.
488 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
490 auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
491 static_cast<DeferredExecutor*>(e)->boost();
496 inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
502 // If current executor is deferred, boost block to ensure that work
503 // progresses and is run on the new executor.
504 auto oldExecutor = this->getExecutor();
505 if (oldExecutor && executor && (executor != oldExecutor)) {
506 // We know in a SemiFuture that if we have an executor it should be
507 // DeferredExecutor. Verify this in debug mode.
508 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(this->getExecutor()));
509 if (static_cast<DeferredExecutor*>(oldExecutor)) {
510 executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
511 static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
516 this->setExecutor(executor, priority);
518 auto newFuture = Future<T>(this->core_);
519 this->core_ = nullptr;
524 template <typename F>
525 SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
526 SemiFuture<T>::defer(F&& func) && {
527 // If we already have a deferred executor, use it, otherwise create one
528 auto defKeepAlive = this->getExecutor()
529 ? this->getExecutor()->getKeepAliveToken()
530 : DeferredExecutor::create();
531 auto e = defKeepAlive.get();
532 // We know in a SemiFuture that if we have an executor it should be
533 // DeferredExecutor (either it was that way before, or we just created it).
534 // Verify this in debug mode.
535 DCHECK(nullptr != dynamic_cast<DeferredExecutor*>(e));
536 // Convert to a folly::future with a deferred executor
537 // Will be low-cost if this is not a new executor as via optimises for that
542 // Then add the work, with a wrapper function that captures the
543 // keepAlive so the executor is destroyed at the right time.
545 DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
546 // Finally, convert back o a folly::SemiFuture to hide the executor
548 // Carry deferred executor through chain as constructor from Future will
555 Future<T> Future<T>::makeEmpty() {
556 return Future<T>(futures::detail::EmptyConstruct{});
560 Future<T>::Future(Future<T>&& other) noexcept
561 : futures::detail::FutureBase<T>(std::move(other)) {}
564 Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
572 typename std::enable_if<
573 !std::is_same<T, typename std::decay<T2>::type>::value &&
574 std::is_constructible<T, T2&&>::value &&
575 std::is_convertible<T2&&, T>::value,
577 Future<T>::Future(Future<T2>&& other)
578 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
583 typename std::enable_if<
584 !std::is_same<T, typename std::decay<T2>::type>::value &&
585 std::is_constructible<T, T2&&>::value &&
586 !std::is_convertible<T2&&, T>::value,
588 Future<T>::Future(Future<T2>&& other)
589 : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
594 typename std::enable_if<
595 !std::is_same<T, typename std::decay<T2>::type>::value &&
596 std::is_constructible<T, T2&&>::value,
598 Future<T>& Future<T>::operator=(Future<T2>&& other) {
600 std::move(other).then([](T2&& v) { return T(std::move(v)); }));
608 enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
609 Future<T>::unwrap() {
610 return then([](Future<typename isFuture<T>::Inner> internal_future) {
611 return internal_future;
616 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
617 this->throwIfInvalid();
619 this->setExecutor(executor, priority);
621 auto newFuture = Future<T>(this->core_);
622 this->core_ = nullptr;
627 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
628 this->throwIfInvalid();
630 auto f = p.getFuture();
631 auto func = [p = std::move(p)](Try<T>&& t) mutable {
632 p.setTry(std::move(t));
634 using R = futures::detail::callableResult<T, decltype(func)>;
635 this->template thenImplementation<decltype(func), R>(
636 std::move(func), typename R::Arg());
637 return std::move(f).via(executor, priority);
640 template <typename T>
641 template <typename R, typename Caller, typename... Args>
642 Future<typename isFuture<R>::Inner>
643 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
644 typedef typename std::remove_cv<typename std::remove_reference<
645 typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
648 return then([instance, func](Try<T>&& t){
649 return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
654 Future<Unit> Future<T>::then() {
655 return then([] () {});
658 // onError where the callback returns T
661 typename std::enable_if<
662 !futures::detail::callableWith<F, exception_wrapper>::value &&
663 !futures::detail::callableWith<F, exception_wrapper&>::value &&
664 !futures::detail::Extract<F>::ReturnsFuture::value,
666 Future<T>::onError(F&& func) {
667 typedef std::remove_reference_t<
668 typename futures::detail::Extract<F>::FirstArg>
671 std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
672 "Return type of onError callback must be T or Future<T>");
675 p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
676 auto f = p.getFuture();
679 [state = futures::detail::makeCoreCallbackState(
680 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
681 if (auto e = t.template tryGetExceptionObject<Exn>()) {
682 state.setTry(makeTryWith([&] { return state.invoke(*e); }));
684 state.setTry(std::move(t));
691 // onError where the callback returns Future<T>
694 typename std::enable_if<
695 !futures::detail::callableWith<F, exception_wrapper>::value &&
696 !futures::detail::callableWith<F, exception_wrapper&>::value &&
697 futures::detail::Extract<F>::ReturnsFuture::value,
699 Future<T>::onError(F&& func) {
701 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
703 "Return type of onError callback must be T or Future<T>");
704 typedef std::remove_reference_t<
705 typename futures::detail::Extract<F>::FirstArg>
709 auto f = p.getFuture();
712 [state = futures::detail::makeCoreCallbackState(
713 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
714 if (auto e = t.template tryGetExceptionObject<Exn>()) {
715 auto tf2 = state.tryInvoke(*e);
716 if (tf2.hasException()) {
717 state.setException(std::move(tf2.exception()));
719 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
720 p.setTry(std::move(t3));
724 state.setTry(std::move(t));
733 Future<T> Future<T>::ensure(F&& func) {
734 return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
736 return makeFuture(std::move(t));
742 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
743 return within(dur, tk).onError([funcw = std::forward<F>(func)](
744 TimedOut const&) { return std::move(funcw)(); });
749 typename std::enable_if<
750 futures::detail::callableWith<F, exception_wrapper>::value &&
751 futures::detail::Extract<F>::ReturnsFuture::value,
753 Future<T>::onError(F&& func) {
755 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
757 "Return type of onError callback must be T or Future<T>");
760 auto f = p.getFuture();
762 [state = futures::detail::makeCoreCallbackState(
763 std::move(p), std::forward<F>(func))](Try<T> t) mutable {
764 if (t.hasException()) {
765 auto tf2 = state.tryInvoke(std::move(t.exception()));
766 if (tf2.hasException()) {
767 state.setException(std::move(tf2.exception()));
769 tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
770 p.setTry(std::move(t3));
774 state.setTry(std::move(t));
781 // onError(exception_wrapper) that returns T
784 typename std::enable_if<
785 futures::detail::callableWith<F, exception_wrapper>::value &&
786 !futures::detail::Extract<F>::ReturnsFuture::value,
788 Future<T>::onError(F&& func) {
790 std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
792 "Return type of onError callback must be T or Future<T>");
795 auto f = p.getFuture();
797 [state = futures::detail::makeCoreCallbackState(
798 std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
799 if (t.hasException()) {
800 state.setTry(makeTryWith(
801 [&] { return state.invoke(std::move(t.exception())); }));
803 state.setTry(std::move(t));
810 template <class Func>
811 auto via(Executor* x, Func&& func)
812 -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
813 // TODO make this actually more performant. :-P #7260175
814 return via(x).then(std::forward<Func>(func));
820 Future<typename std::decay<T>::type> makeFuture(T&& t) {
821 return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
824 inline Future<Unit> makeFuture() {
825 return makeFuture(Unit{});
828 // makeFutureWith(Future<T>()) -> Future<T>
830 typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
831 typename std::result_of<F()>::type>::type
832 makeFutureWith(F&& func) {
834 typename isFuture<typename std::result_of<F()>::type>::Inner;
836 return std::forward<F>(func)();
837 } catch (std::exception& e) {
838 return makeFuture<InnerType>(
839 exception_wrapper(std::current_exception(), e));
841 return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
845 // makeFutureWith(T()) -> Future<T>
846 // makeFutureWith(void()) -> Future<Unit>
848 typename std::enable_if<
849 !(isFuture<typename std::result_of<F()>::type>::value),
850 Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
851 makeFutureWith(F&& func) {
852 using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
853 return makeFuture<LiftedResult>(
854 makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
858 Future<T> makeFuture(std::exception_ptr const& e) {
859 return makeFuture(Try<T>(e));
863 Future<T> makeFuture(exception_wrapper ew) {
864 return makeFuture(Try<T>(std::move(ew)));
867 template <class T, class E>
868 typename std::enable_if<std::is_base_of<std::exception, E>::value,
870 makeFuture(E const& e) {
871 return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
875 Future<T> makeFuture(Try<T>&& t) {
876 return Future<T>(new futures::detail::Core<T>(std::move(t)));
880 Future<Unit> via(Executor* executor, int8_t priority) {
881 return makeFuture().via(executor, priority);
884 // mapSetCallback calls func(i, Try<T>) when every future completes
886 template <class T, class InputIterator, class F>
887 void mapSetCallback(InputIterator first, InputIterator last, F func) {
888 for (size_t i = 0; first != last; ++first, ++i) {
889 first->setCallback_([func, i](Try<T>&& t) {
890 func(i, std::move(t));
895 // collectAll (variadic)
897 template <typename... Fs>
898 typename futures::detail::CollectAllVariadicContext<
899 typename std::decay<Fs>::type::value_type...>::type
900 collectAll(Fs&&... fs) {
901 auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
902 typename std::decay<Fs>::type::value_type...>>();
903 futures::detail::collectVariadicHelper<
904 futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
905 return ctx->p.getFuture();
908 // collectAll (iterator)
910 template <class InputIterator>
913 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
914 collectAll(InputIterator first, InputIterator last) {
916 typename std::iterator_traits<InputIterator>::value_type::value_type T;
918 struct CollectAllContext {
919 CollectAllContext(size_t n) : results(n) {}
920 ~CollectAllContext() {
921 p.setValue(std::move(results));
923 Promise<std::vector<Try<T>>> p;
924 std::vector<Try<T>> results;
928 std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
929 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
930 ctx->results[i] = std::move(t);
932 return ctx->p.getFuture();
935 // collect (iterator)
940 template <typename T>
941 struct CollectContext {
943 explicit Nothing(int /* n */) {}
946 using Result = typename std::conditional<
947 std::is_void<T>::value,
949 std::vector<T>>::type;
951 using InternalResult = typename std::conditional<
952 std::is_void<T>::value,
954 std::vector<Optional<T>>>::type;
956 explicit CollectContext(size_t n) : result(n) {
957 finalResult.reserve(n);
960 if (!threw.exchange(true)) {
961 // map Optional<T> -> T
962 std::transform(result.begin(), result.end(),
963 std::back_inserter(finalResult),
964 [](Optional<T>& o) { return std::move(o.value()); });
965 p.setValue(std::move(finalResult));
968 inline void setPartialResult(size_t i, Try<T>& t) {
969 result[i] = std::move(t.value());
972 InternalResult result;
974 std::atomic<bool> threw {false};
977 } // namespace detail
978 } // namespace futures
980 template <class InputIterator>
981 Future<typename futures::detail::CollectContext<typename std::iterator_traits<
982 InputIterator>::value_type::value_type>::Result>
983 collect(InputIterator first, InputIterator last) {
985 typename std::iterator_traits<InputIterator>::value_type::value_type T;
987 auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
988 std::distance(first, last));
989 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
990 if (t.hasException()) {
991 if (!ctx->threw.exchange(true)) {
992 ctx->p.setException(std::move(t.exception()));
994 } else if (!ctx->threw) {
995 ctx->setPartialResult(i, t);
998 return ctx->p.getFuture();
1001 // collect (variadic)
1003 template <typename... Fs>
1004 typename futures::detail::CollectVariadicContext<
1005 typename std::decay<Fs>::type::value_type...>::type
1006 collect(Fs&&... fs) {
1007 auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
1008 typename std::decay<Fs>::type::value_type...>>();
1009 futures::detail::collectVariadicHelper<
1010 futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
1011 return ctx->p.getFuture();
1014 // collectAny (iterator)
1016 template <class InputIterator>
1021 std::iterator_traits<InputIterator>::value_type::value_type>>>
1022 collectAny(InputIterator first, InputIterator last) {
1024 typename std::iterator_traits<InputIterator>::value_type::value_type T;
1026 struct CollectAnyContext {
1027 CollectAnyContext() {}
1028 Promise<std::pair<size_t, Try<T>>> p;
1029 std::atomic<bool> done {false};
1032 auto ctx = std::make_shared<CollectAnyContext>();
1033 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1034 if (!ctx->done.exchange(true)) {
1035 ctx->p.setValue(std::make_pair(i, std::move(t)));
1038 return ctx->p.getFuture();
1041 // collectAnyWithoutException (iterator)
1043 template <class InputIterator>
1046 typename std::iterator_traits<InputIterator>::value_type::value_type>>
1047 collectAnyWithoutException(InputIterator first, InputIterator last) {
1049 typename std::iterator_traits<InputIterator>::value_type::value_type T;
1051 struct CollectAnyWithoutExceptionContext {
1052 CollectAnyWithoutExceptionContext(){}
1053 Promise<std::pair<size_t, T>> p;
1054 std::atomic<bool> done{false};
1055 std::atomic<size_t> nFulfilled{0};
1059 auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
1060 ctx->nTotal = size_t(std::distance(first, last));
1062 mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
1063 if (!t.hasException() && !ctx->done.exchange(true)) {
1064 ctx->p.setValue(std::make_pair(i, std::move(t.value())));
1065 } else if (++ctx->nFulfilled == ctx->nTotal) {
1066 ctx->p.setException(t.exception());
1069 return ctx->p.getFuture();
1072 // collectN (iterator)
1074 template <class InputIterator>
1075 Future<std::vector<std::pair<size_t, Try<typename
1076 std::iterator_traits<InputIterator>::value_type::value_type>>>>
1077 collectN(InputIterator first, InputIterator last, size_t n) {
1079 std::iterator_traits<InputIterator>::value_type::value_type T;
1080 typedef std::vector<std::pair<size_t, Try<T>>> V;
1082 struct CollectNContext {
1084 std::atomic<size_t> completed = {0};
1087 auto ctx = std::make_shared<CollectNContext>();
1089 if (size_t(std::distance(first, last)) < n) {
1090 ctx->p.setException(std::runtime_error("Not enough futures"));
1092 // for each completed Future, increase count and add to vector, until we
1093 // have n completed futures at which point we fulfil our Promise with the
1095 mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
1096 auto c = ++ctx->completed;
1098 assert(ctx->v.size() < n);
1099 ctx->v.emplace_back(i, std::move(t));
1101 ctx->p.setTry(Try<V>(std::move(ctx->v)));
1107 return ctx->p.getFuture();
1110 // reduce (iterator)
1112 template <class It, class T, class F>
1113 Future<T> reduce(It first, It last, T&& initial, F&& func) {
1114 if (first == last) {
1115 return makeFuture(std::move(initial));
1118 typedef typename std::iterator_traits<It>::value_type::value_type ItT;
1119 typedef typename std::conditional<
1120 futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
1123 typedef isTry<Arg> IsTry;
1125 auto sfunc = std::make_shared<F>(std::move(func));
1127 auto f = first->then(
1128 [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
1130 std::move(minitial), head.template get<IsTry::value, Arg&&>());
1133 for (++first; first != last; ++first) {
1134 f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
1135 return (*sfunc)(std::move(std::get<0>(t).value()),
1136 // Either return a ItT&& or a Try<ItT>&& depending
1137 // on the type of the argument of func.
1138 std::get<1>(t).template get<IsTry::value, Arg&&>());
1145 // window (collection)
1147 template <class Collection, class F, class ItT, class Result>
1148 std::vector<Future<Result>>
1149 window(Collection input, F func, size_t n) {
1150 // Use global inline executor singleton
1151 auto executor = &InlineExecutor::instance();
1152 return window(executor, std::move(input), std::move(func), n);
1155 template <class Collection, class F, class ItT, class Result>
1156 std::vector<Future<Result>>
1157 window(Executor* executor, Collection input, F func, size_t n) {
1158 struct WindowContext {
1159 WindowContext(Executor* executor_, Collection&& input_, F&& func_)
1160 : executor(executor_),
1161 input(std::move(input_)),
1162 promises(input.size()),
1163 func(std::move(func_)) {}
1164 std::atomic<size_t> i{0};
1167 std::vector<Promise<Result>> promises;
1170 static inline void spawn(std::shared_ptr<WindowContext> ctx) {
1171 size_t i = ctx->i++;
1172 if (i < ctx->input.size()) {
1173 auto fut = ctx->func(std::move(ctx->input[i]));
1174 fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
1175 const auto executor_ = ctx->executor;
1176 executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
1177 ctx->promises[i].setTry(std::move(t));
1178 // Chain another future onto this one
1179 spawn(std::move(ctx));
1186 auto max = std::min(n, input.size());
1188 auto ctx = std::make_shared<WindowContext>(
1189 executor, std::move(input), std::move(func));
1191 // Start the first n Futures
1192 for (size_t i = 0; i < max; ++i) {
1193 executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
1196 std::vector<Future<Result>> futures;
1197 futures.reserve(ctx->promises.size());
1198 for (auto& promise : ctx->promises) {
1199 futures.emplace_back(promise.getFuture());
1208 template <class I, class F>
1209 Future<I> Future<T>::reduce(I&& initial, F&& func) {
1211 minitial = std::forward<I>(initial),
1212 mfunc = std::forward<F>(func)
1213 ](T& vals) mutable {
1214 auto ret = std::move(minitial);
1215 for (auto& val : vals) {
1216 ret = mfunc(std::move(ret), std::move(val));
1222 // unorderedReduce (iterator)
1224 template <class It, class T, class F, class ItT, class Arg>
1225 Future<T> unorderedReduce(It first, It last, T initial, F func) {
1226 if (first == last) {
1227 return makeFuture(std::move(initial));
1230 typedef isTry<Arg> IsTry;
1232 struct UnorderedReduceContext {
1233 UnorderedReduceContext(T&& memo, F&& fn, size_t n)
1234 : lock_(), memo_(makeFuture<T>(std::move(memo))),
1235 func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
1237 folly::MicroSpinLock lock_; // protects memo_ and numThens_
1240 size_t numThens_; // how many Futures completed and called .then()
1241 size_t numFutures_; // how many Futures in total
1242 Promise<T> promise_;
1245 auto ctx = std::make_shared<UnorderedReduceContext>(
1246 std::move(initial), std::move(func), std::distance(first, last));
1248 mapSetCallback<ItT>(
1251 [ctx](size_t /* i */, Try<ItT>&& t) {
1252 // Futures can be completed in any order, simultaneously.
1253 // To make this non-blocking, we create a new Future chain in
1254 // the order of completion to reduce the values.
1255 // The spinlock just protects chaining a new Future, not actually
1256 // executing the reduce, which should be really fast.
1257 folly::MSLGuard lock(ctx->lock_);
1259 ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
1260 // Either return a ItT&& or a Try<ItT>&& depending
1261 // on the type of the argument of func.
1262 return ctx->func_(std::move(v),
1263 mt.template get<IsTry::value, Arg&&>());
1265 if (++ctx->numThens_ == ctx->numFutures_) {
1266 // After reducing the value of the last Future, fulfill the Promise
1267 ctx->memo_.setCallback_(
1268 [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
1272 return ctx->promise_.getFuture();
1278 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
1279 return within(dur, TimedOut(), tk);
1284 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
1287 Context(E ex) : exception(std::move(ex)), promise() {}
1289 Future<Unit> thisFuture;
1291 std::atomic<bool> token {false};
1294 if (this->isReady()) {
1295 return std::move(*this);
1298 std::shared_ptr<Timekeeper> tks;
1300 tks = folly::detail::getTimekeeperSingleton();
1304 if (UNLIKELY(!tk)) {
1305 return makeFuture<T>(NoTimekeeper());
1308 auto ctx = std::make_shared<Context>(std::move(e));
1310 ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
1311 if (ctx->token.exchange(true) == false) {
1312 ctx->promise.setTry(std::move(t));
1316 // Have time keeper use a weak ptr to hold ctx,
1317 // so that ctx can be deallocated as soon as the future job finished.
1318 tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
1319 auto lockedCtx = weakCtx.lock();
1321 // ctx already released. "this" completed first, cancel "after"
1324 // "after" completed first, cancel "this"
1325 lockedCtx->thisFuture.raise(TimedOut());
1326 if (lockedCtx->token.exchange(true) == false) {
1327 if (t.hasException()) {
1328 lockedCtx->promise.setException(std::move(t.exception()));
1330 lockedCtx->promise.setException(std::move(lockedCtx->exception));
1335 return ctx->promise.getFuture().via(this->getExecutor());
1341 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
1342 return collectAll(*this, futures::sleep(dur, tk))
1343 .then([](std::tuple<Try<T>, Try<Unit>> tup) {
1344 Try<T>& t = std::get<0>(tup);
1345 return makeFuture<T>(std::move(t));
1353 void doBoost(folly::Future<T>& /* usused */) {}
1356 void doBoost(folly::SemiFuture<T>& f) {
1360 template <class FutureType, typename T = typename FutureType::value_type>
1361 void waitImpl(FutureType& f) {
1362 // short-circuit if there's nothing to do
1367 FutureBatonType baton;
1368 f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
1371 assert(f.isReady());
1374 template <class FutureType, typename T = typename FutureType::value_type>
1375 void waitImpl(FutureType& f, Duration dur) {
1376 // short-circuit if there's nothing to do
1382 auto ret = promise.getFuture();
1383 auto baton = std::make_shared<FutureBatonType>();
1384 f.setCallback_([baton, promise = std::move(promise)](Try<T>&& t) mutable {
1385 promise.setTry(std::move(t));
1390 if (baton->try_wait_for(dur)) {
1391 assert(f.isReady());
1396 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
1397 // Set callback so to ensure that the via executor has something on it
1398 // so that once the preceding future triggers this callback, drive will
1399 // always have a callback to satisfy it
1403 f = f.via(e).then([](T&& t) { return std::move(t); });
1404 while (!f.isReady()) {
1407 assert(f.isReady());
1411 void waitViaImpl(SemiFuture<T>& f, DrivableExecutor* e) {
1412 // Set callback so to ensure that the via executor has something on it
1413 // so that once the preceding future triggers this callback, drive will
1414 // always have a callback to satisfy it
1418 f = std::move(f).via(e).then([](T&& t) { return std::move(t); });
1419 while (!f.isReady()) {
1422 assert(f.isReady());
1425 } // namespace detail
1426 } // namespace futures
1429 SemiFuture<T>& SemiFuture<T>::wait() & {
1430 futures::detail::waitImpl(*this);
1435 SemiFuture<T>&& SemiFuture<T>::wait() && {
1436 futures::detail::waitImpl(*this);
1437 return std::move(*this);
1441 SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
1442 futures::detail::waitImpl(*this, dur);
1447 SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
1448 futures::detail::waitImpl(*this, dur);
1449 return std::move(*this);
1453 SemiFuture<T>& SemiFuture<T>::waitVia(DrivableExecutor* e) & {
1454 futures::detail::waitViaImpl(*this, e);
1459 SemiFuture<T>&& SemiFuture<T>::waitVia(DrivableExecutor* e) && {
1460 futures::detail::waitViaImpl(*this, e);
1461 return std::move(*this);
1465 T SemiFuture<T>::get() && {
1466 return std::move(wait()).value();
1470 T SemiFuture<T>::get(Duration dur) && {
1472 if (this->isReady()) {
1473 return std::move(this->value());
1480 Try<T> SemiFuture<T>::getTry() && {
1481 return std::move(wait()).result();
1485 Try<T> SemiFuture<T>::getTry(Duration dur) && {
1487 if (this->isReady()) {
1488 return std::move(this->result());
1495 T SemiFuture<T>::getVia(DrivableExecutor* e) && {
1496 return std::move(waitVia(e)).value();
1500 Try<T> SemiFuture<T>::getTryVia(DrivableExecutor* e) && {
1501 return std::move(waitVia(e)).result();
1505 Future<T>& Future<T>::wait() & {
1506 futures::detail::waitImpl(*this);
1511 Future<T>&& Future<T>::wait() && {
1512 futures::detail::waitImpl(*this);
1513 return std::move(*this);
1517 Future<T>& Future<T>::wait(Duration dur) & {
1518 futures::detail::waitImpl(*this, dur);
1523 Future<T>&& Future<T>::wait(Duration dur) && {
1524 futures::detail::waitImpl(*this, dur);
1525 return std::move(*this);
1529 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
1530 futures::detail::waitViaImpl(*this, e);
1535 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
1536 futures::detail::waitViaImpl(*this, e);
1537 return std::move(*this);
1541 T Future<T>::get() {
1542 return std::move(wait().value());
1546 T Future<T>::get(Duration dur) {
1548 if (this->isReady()) {
1549 return std::move(this->value());
1556 Try<T>& Future<T>::getTry() {
1561 T Future<T>::getVia(DrivableExecutor* e) {
1562 return std::move(waitVia(e).value());
1566 Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
1567 return waitVia(e).getTry();
1574 static bool equals(const Try<T>& t1, const Try<T>& t2) {
1575 return t1.value() == t2.value();
1578 } // namespace detail
1579 } // namespace futures
1582 Future<bool> Future<T>::willEqual(Future<T>& f) {
1583 return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
1584 if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
1585 return futures::detail::TryEquals<T>::equals(
1586 std::get<0>(t), std::get<1>(t));
1595 Future<T> Future<T>::filter(F&& predicate) {
1596 return this->then([p = std::forward<F>(predicate)](T val) {
1597 T const& valConstRef = val;
1598 if (!p(valConstRef)) {
1599 throwPredicateDoesNotObtain();
1606 inline Future<Unit> when(bool p, F&& thunk) {
1607 return p ? std::forward<F>(thunk)().unit() : makeFuture();
1610 template <class P, class F>
1611 Future<Unit> whileDo(P&& predicate, F&& thunk) {
1613 auto future = thunk();
1614 return future.then([
1615 predicate = std::forward<P>(predicate),
1616 thunk = std::forward<F>(thunk)
1618 return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
1621 return makeFuture();
1625 Future<Unit> times(const int n, F&& thunk) {
1626 return folly::whileDo(
1627 [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
1628 return count->fetch_add(1) < n;
1630 std::forward<F>(thunk));
1634 template <class It, class F, class ItT, class Result>
1635 std::vector<Future<Result>> map(It first, It last, F func) {
1636 std::vector<Future<Result>> results;
1637 for (auto it = first; it != last; it++) {
1638 results.push_back(it->then(func));
1642 } // namespace futures
1644 // Instantiate the most common Future types to save compile time
1645 extern template class Future<Unit>;
1646 extern template class Future<bool>;
1647 extern template class Future<int>;
1648 extern template class Future<int64_t>;
1649 extern template class Future<std::string>;
1650 extern template class Future<double>;
1651 } // namespace folly