X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Ffutures%2FFuture-inl.h;h=56b98410abcfe59d24cfcdbe89caecf08a0d8ba2;hp=c4f9ca2f307ab6b49e6c1286f1947346d72867f4;hb=ff18deaf720fbe59551a7ff275b09003a61c4351;hpb=4762a35e70fb08622baed5d30157c99c215f4f15 diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index c4f9ca2f..56b98410 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -1,5 +1,5 @@ /* - * Copyright 2017 Facebook, Inc. + * Copyright 2017-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,72 +13,230 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #pragma once #include #include #include -#include #include -#include + #include -#include -#include -#include +#include #include +#include +#include +#ifndef FOLLY_FUTURE_USING_FIBER #if FOLLY_MOBILE || defined(__APPLE__) #define FOLLY_FUTURE_USING_FIBER 0 #else #define FOLLY_FUTURE_USING_FIBER 1 #include #endif +#endif namespace folly { class Timekeeper; +namespace futures { namespace detail { #if FOLLY_FUTURE_USING_FIBER typedef folly::fibers::Baton FutureBatonType; #else typedef folly::Baton<> FutureBatonType; #endif -} +} // namespace detail +} // namespace futures namespace detail { - std::shared_ptr getTimekeeperSingleton(); +std::shared_ptr getTimekeeperSingleton(); +} // namespace detail + +namespace futures { +namespace detail { +// Guarantees that the stored functor is destructed before the stored promise +// may be fulfilled. Assumes the stored functor to be noexcept-destructible. +template +class CoreCallbackState { + public: + template + CoreCallbackState(Promise&& promise, FF&& func) noexcept( + noexcept(F(std::declval()))) + : func_(std::forward(func)), promise_(std::move(promise)) { + assert(before_barrier()); + } + + CoreCallbackState(CoreCallbackState&& that) noexcept( + noexcept(F(std::declval()))) { + if (that.before_barrier()) { + new (&func_) F(std::move(that.func_)); + promise_ = that.stealPromise(); + } + } + + CoreCallbackState& operator=(CoreCallbackState&&) = delete; + + ~CoreCallbackState() { + if (before_barrier()) { + stealPromise(); + } + } + + template + auto invoke(Args&&... args) noexcept( + noexcept(std::declval()(std::declval()...))) { + assert(before_barrier()); + return std::move(func_)(std::forward(args)...); + } + + template + auto tryInvoke(Args&&... args) noexcept { + return makeTryWith([&] { return invoke(std::forward(args)...); }); + } + + void setTry(Try&& t) { + stealPromise().setTry(std::move(t)); + } + + void setException(exception_wrapper&& ew) { + stealPromise().setException(std::move(ew)); + } + + Promise stealPromise() noexcept { + assert(before_barrier()); + func_.~F(); + return std::move(promise_); + } + + private: + bool before_barrier() const noexcept { + return !promise_.isFulfilled(); + } + + union { + F func_; + }; + Promise promise_{Promise::makeEmpty()}; +}; + +template +inline auto makeCoreCallbackState(Promise&& p, F&& f) noexcept( + noexcept(CoreCallbackState>>( + std::declval&&>(), + std::declval()))) { + return CoreCallbackState>>( + std::move(p), std::forward(f)); } template -Future::Future(Future&& other) noexcept : core_(other.core_) { +FutureBase::FutureBase(SemiFuture&& other) noexcept : core_(other.core_) { other.core_ = nullptr; } template -Future& Future::operator=(Future&& other) noexcept { - std::swap(core_, other.core_); - return *this; +FutureBase::FutureBase(Future&& other) noexcept : core_(other.core_) { + other.core_ = nullptr; } template template -Future::Future(T2&& val) - : core_(new detail::Core(Try(std::forward(val)))) {} +FutureBase::FutureBase(T2&& val) + : core_(new futures::detail::Core(Try(std::forward(val)))) {} template template -Future::Future(typename std::enable_if::value>::type*) - : core_(new detail::Core(Try(T()))) {} +FutureBase::FutureBase( + typename std::enable_if::value>::type*) + : core_(new futures::detail::Core(Try(T()))) {} + +template +template < + class... Args, + typename std::enable_if::value, int>:: + type> +FutureBase::FutureBase(in_place_t, Args&&... args) + : core_( + new futures::detail::Core(in_place, std::forward(args)...)) { +} + +template +template +void FutureBase::assign(FutureType& other) noexcept { + std::swap(core_, other.core_); +} template -Future::~Future() { +FutureBase::~FutureBase() { detach(); } template -void Future::detach() { +T& FutureBase::value() & { + return result().value(); +} + +template +T const& FutureBase::value() const& { + return result().value(); +} + +template +T&& FutureBase::value() && { + return std::move(result().value()); +} + +template +T const&& FutureBase::value() const&& { + return std::move(result().value()); +} + +template +Try& FutureBase::result() & { + throwIfInvalid(); + + return core_->getTry(); +} + +template +Try const& FutureBase::result() const& { + throwIfInvalid(); + + return core_->getTry(); +} + +template +Try&& FutureBase::result() && { + throwIfInvalid(); + + return std::move(core_->getTry()); +} + +template +Try const&& FutureBase::result() const&& { + throwIfInvalid(); + + return std::move(core_->getTry()); +} + +template +bool FutureBase::isReady() const { + throwIfInvalid(); + return core_->ready(); +} + +template +bool FutureBase::hasValue() { + return core_->getTry().hasValue(); +} + +template +bool FutureBase::hasException() { + return core_->getTry().hasException(); +} + +template +void FutureBase::detach() { if (core_) { core_->detachFuture(); core_ = nullptr; @@ -86,30 +244,37 @@ void Future::detach() { } template -void Future::throwIfInvalid() const { - if (!core_) - throw NoState(); +void FutureBase::throwIfInvalid() const { + if (!core_) { + throwNoState(); + } } template -template -void Future::setCallback_(F&& func) { - throwIfInvalid(); - core_->setCallback(std::forward(func)); +Optional> FutureBase::poll() { + Optional> o; + if (core_->ready()) { + o = std::move(core_->getTry()); + } + return o; } -// unwrap +template +void FutureBase::raise(exception_wrapper exception) { + core_->raise(std::move(exception)); +} template template -typename std::enable_if::value, - Future::Inner>>::type -Future::unwrap() { - return then([](Future::Inner> internal_future) { - return internal_future; - }); +void FutureBase::setCallback_(F&& func) { + throwIfInvalid(); + core_->setCallback(std::forward(func)); } +template +FutureBase::FutureBase(futures::detail::EmptyConstruct) noexcept + : core_(nullptr) {} + // then // Variant: returns a value @@ -117,18 +282,20 @@ Future::unwrap() { template template typename std::enable_if::type -Future::thenImplementation(F&& func, detail::argResult) { +FutureBase::thenImplementation( + F&& func, + futures::detail::argResult) { static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument"); typedef typename R::ReturnsFuture::Inner B; - throwIfInvalid(); + this->throwIfInvalid(); Promise p; - p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); + p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler()); // grab the Future now before we lose our handle on the Promise auto f = p.getFuture(); - f.core_->setExecutorNoLock(getExecutor()); + f.core_->setExecutorNoLock(this->getExecutor()); /* This is a bit tricky. @@ -149,27 +316,27 @@ Future::thenImplementation(F&& func, detail::argResult) { persist beyond the callback, if it gets moved), and so it is an optimization to just make it shared from the get-go. - Two subtle but important points about this design. detail::Core has no - back pointers to Future or Promise, so if Future or Promise get moved - (and they will be moved in performant code) we don't have to do + Two subtle but important points about this design. futures::detail::Core + has no back pointers to Future or Promise, so if Future or Promise get + moved (and they will be moved in performant code) we don't have to do anything fancy. And because we store the continuation in the - detail::Core, not in the Future, we can execute the continuation even - after the Future has gone out of scope. This is an intentional design + futures::detail::Core, not in the Future, we can execute the continuation + even after the Future has gone out of scope. This is an intentional design decision. It is likely we will want to be able to cancel a continuation in some circumstances, but I think it should be explicit not implicit in the destruction of the Future used to create it. */ - setCallback_( - [ func = std::forward(func), pm = std::move(p) ](Try && t) mutable { + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (!isTry && t.hasException()) { - pm.setException(std::move(t.exception())); + state.setException(std::move(t.exception())); } else { - pm.setWith([&]() { - return std::move(func)(t.template get()...); - }); + state.setTry(makeTryWith( + [&] { return state.invoke(t.template get()...); })); } }); - return f; } @@ -178,71 +345,311 @@ Future::thenImplementation(F&& func, detail::argResult) { template template typename std::enable_if::type -Future::thenImplementation(F&& func, detail::argResult) { +FutureBase::thenImplementation( + F&& func, + futures::detail::argResult) { static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument"); typedef typename R::ReturnsFuture::Inner B; - - throwIfInvalid(); + this->throwIfInvalid(); Promise p; - p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); + p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler()); // grab the Future now before we lose our handle on the Promise auto f = p.getFuture(); - f.core_->setExecutorNoLock(getExecutor()); + f.core_->setExecutorNoLock(this->getExecutor()); - setCallback_([ func = std::forward(func), pm = std::move(p) ]( - Try && t) mutable { - auto ew = [&] { - if (!isTry && t.hasException()) { - return std::move(t.exception()); - } else { - try { - auto f2 = std::move(func)(t.template get()...); - // that didn't throw, now we can steal p - f2.setCallback_([p = std::move(pm)](Try && b) mutable { - p.setTry(std::move(b)); - }); - return exception_wrapper(); - } catch (const std::exception& e) { - return exception_wrapper(std::current_exception(), e); - } catch (...) { - return exception_wrapper(std::current_exception()); + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (!isTry && t.hasException()) { + state.setException(std::move(t.exception())); + } else { + auto tf2 = state.tryInvoke(t.template get()...); + if (tf2.hasException()) { + state.setException(std::move(tf2.exception())); + } else { + tf2->setCallback_([p = state.stealPromise()](Try && b) mutable { + p.setTry(std::move(b)); + }); + } } - } - }(); - if (ew) { - pm.setException(std::move(ew)); + }); + + return f; +} +} // namespace detail +} // namespace futures + +template +SemiFuture::type> makeSemiFuture(T&& t) { + return makeSemiFuture(Try::type>(std::forward(t))); +} + +// makeSemiFutureWith(SemiFuture()) -> SemiFuture +template +typename std::enable_if< + isSemiFuture::type>::value, + typename std::result_of::type>::type +makeSemiFutureWith(F&& func) { + using InnerType = + typename isSemiFuture::type>::Inner; + try { + return std::forward(func)(); + } catch (std::exception& e) { + return makeSemiFuture( + exception_wrapper(std::current_exception(), e)); + } catch (...) { + return makeSemiFuture( + exception_wrapper(std::current_exception())); + } +} + +// makeSemiFutureWith(T()) -> SemiFuture +// makeSemiFutureWith(void()) -> SemiFuture +template +typename std::enable_if< + !(isSemiFuture::type>::value), + SemiFuture::type>>>::type +makeSemiFutureWith(F&& func) { + using LiftedResult = Unit::LiftT::type>; + return makeSemiFuture( + makeTryWith([&func]() mutable { return std::forward(func)(); })); +} + +template +SemiFuture makeSemiFuture(std::exception_ptr const& e) { + return makeSemiFuture(Try(e)); +} + +template +SemiFuture makeSemiFuture(exception_wrapper ew) { + return makeSemiFuture(Try(std::move(ew))); +} + +template +typename std:: + enable_if::value, SemiFuture>::type + makeSemiFuture(E const& e) { + return makeSemiFuture(Try(make_exception_wrapper(e))); +} + +template +SemiFuture makeSemiFuture(Try&& t) { + return SemiFuture(new futures::detail::Core(std::move(t))); +} + +// This must be defined after the constructors to avoid a bug in MSVC +// https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244 +inline SemiFuture makeSemiFuture() { + return makeSemiFuture(Unit{}); +} + +template +SemiFuture SemiFuture::makeEmpty() { + return SemiFuture(futures::detail::EmptyConstruct{}); +} + +template +SemiFuture::SemiFuture(SemiFuture&& other) noexcept + : futures::detail::FutureBase(std::move(other)) {} + +template +SemiFuture::SemiFuture(Future&& other) noexcept + : futures::detail::FutureBase(std::move(other)) { + // SemiFuture should not have an executor on construction + if (this->core_) { + this->setExecutor(nullptr); + } +} + +template +SemiFuture& SemiFuture::operator=(SemiFuture&& other) noexcept { + this->assign(other); + return *this; +} + +template +SemiFuture& SemiFuture::operator=(Future&& other) noexcept { + this->assign(other); + // SemiFuture should not have an executor on construction + if (this->core_) { + this->setExecutor(nullptr); + } + return *this; +} + +template +void SemiFuture::boost_() { + // If a SemiFuture has an executor it should be deferred, so boost it + if (auto e = this->getExecutor()) { + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor. Verify this in debug mode. + DCHECK(nullptr != dynamic_cast(e)); + + auto ka = static_cast(e)->getKeepAliveToken(); + static_cast(e)->boost(); + } +} + +template +inline Future SemiFuture::via(Executor* executor, int8_t priority) && { + throwIfInvalid(); + if (!executor) { + throwNoExecutor(); + } + + // If current executor is deferred, boost block to ensure that work + // progresses and is run on the new executor. + auto oldExecutor = this->getExecutor(); + if (oldExecutor && executor && (executor != oldExecutor)) { + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor. Verify this in debug mode. + DCHECK(nullptr != dynamic_cast(this->getExecutor())); + if (static_cast(oldExecutor)) { + executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() { + static_cast(oldExecutorKA.get())->boost(); + }); } + } + + this->setExecutor(executor, priority); + + auto newFuture = Future(this->core_); + this->core_ = nullptr; + return newFuture; +} + +template +template +SemiFuture::Return::value_type> +SemiFuture::defer(F&& func) && { + // If we already have a deferred executor, use it, otherwise create one + auto defKeepAlive = this->getExecutor() + ? this->getExecutor()->getKeepAliveToken() + : DeferredExecutor::create(); + auto e = defKeepAlive.get(); + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor (either it was that way before, or we just created it). + // Verify this in debug mode. + DCHECK(nullptr != dynamic_cast(e)); + // Convert to a folly::future with a deferred executor + // Will be low-cost if this is not a new executor as via optimises for that + // case + auto sf = + std::move(*this) + .via(e) + // Then add the work, with a wrapper function that captures the + // keepAlive so the executor is destroyed at the right time. + .then( + DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func))) + // Finally, convert back o a folly::SemiFuture to hide the executor + .semi(); + // Carry deferred executor through chain as constructor from Future will + // nullify it + sf.setExecutor(e); + return sf; +} + +template +Future Future::makeEmpty() { + return Future(futures::detail::EmptyConstruct{}); +} + +template +Future::Future(Future&& other) noexcept + : futures::detail::FutureBase(std::move(other)) {} + +template +Future& Future::operator=(Future&& other) noexcept { + this->assign(other); + return *this; +} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value && + std::is_convertible::value, + int>::type> +Future::Future(Future&& other) + : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value && + !std::is_convertible::value, + int>::type> +Future::Future(Future&& other) + : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value, + int>::type> +Future& Future::operator=(Future&& other) { + return operator=( + std::move(other).then([](T2&& v) { return T(std::move(v)); })); +} + +// unwrap + +template +template +typename std:: + enable_if::value, Future::Inner>>::type + Future::unwrap() { + return then([](Future::Inner> internal_future) { + return internal_future; }); +} - return f; +template +inline Future Future::via(Executor* executor, int8_t priority) && { + this->throwIfInvalid(); + + this->setExecutor(executor, priority); + + auto newFuture = Future(this->core_); + this->core_ = nullptr; + return newFuture; +} + +template +inline Future Future::via(Executor* executor, int8_t priority) & { + this->throwIfInvalid(); + Promise p; + auto f = p.getFuture(); + auto func = [p = std::move(p)](Try&& t) mutable { + p.setTry(std::move(t)); + }; + using R = futures::detail::callableResult; + this->template thenImplementation( + std::move(func), typename R::Arg()); + return std::move(f).via(executor, priority); } template template Future::Inner> Future::then(R(Caller::*func)(Args...), Caller *instance) { - typedef typename std::remove_cv< - typename std::remove_reference< - typename detail::ArgType::FirstArg>::type>::type FirstArg; + typedef typename std::remove_cv::FirstArg>::type>::type + FirstArg; + return then([instance, func](Try&& t){ return (instance->*func)(t.template get::value, Args>()...); }); } -template -template -auto Future::then(Executor* x, Arg&& arg, Args&&... args) - -> decltype(this->then(std::forward(arg), - std::forward(args)...)) -{ - auto oldX = getExecutor(); - setExecutor(x); - return this->then(std::forward(arg), std::forward(args)...). - via(oldX); -} - template Future Future::then() { return then([] () {}); @@ -252,25 +659,29 @@ Future Future::then() { template template typename std::enable_if< - !detail::callableWith::value && - !detail::Extract::ReturnsFuture::value, - Future>::type + !futures::detail::callableWith::value && + !futures::detail::callableWith::value && + !futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { - typedef typename detail::Extract::FirstArg Exn; + typedef std::remove_reference_t< + typename futures::detail::Extract::FirstArg> + Exn; static_assert( - std::is_same::RawReturn, T>::value, + std::is_same::RawReturn, T>::value, "Return type of onError callback must be T or Future"); Promise p; - p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); + p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler()); auto f = p.getFuture(); - setCallback_( - [ func = std::forward(func), pm = std::move(p) ](Try && t) mutable { - if (!t.template withException([&](Exn& e) { - pm.setWith([&] { return std::move(func)(e); }); - })) { - pm.setTry(std::move(t)); + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (auto e = t.template tryGetExceptionObject()) { + state.setTry(makeTryWith([&] { return state.invoke(*e); })); + } else { + state.setTry(std::move(t)); } }); @@ -281,41 +692,38 @@ Future::onError(F&& func) { template template typename std::enable_if< - !detail::callableWith::value && - detail::Extract::ReturnsFuture::value, - Future>::type + !futures::detail::callableWith::value && + !futures::detail::callableWith::value && + futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { static_assert( - std::is_same::Return, Future>::value, + std::is_same::Return, Future>:: + value, "Return type of onError callback must be T or Future"); - typedef typename detail::Extract::FirstArg Exn; + typedef std::remove_reference_t< + typename futures::detail::Extract::FirstArg> + Exn; Promise p; auto f = p.getFuture(); - setCallback_([ pm = std::move(p), func = std::forward(func) ]( - Try && t) mutable { - if (!t.template withException([&](Exn& e) { - auto ew = [&] { - try { - auto f2 = std::move(func)(e); - f2.setCallback_([pm = std::move(pm)](Try && t2) mutable { - pm.setTry(std::move(t2)); - }); - return exception_wrapper(); - } catch (const std::exception& e2) { - return exception_wrapper(std::current_exception(), e2); - } catch (...) { - return exception_wrapper(std::current_exception()); - } - }(); - if (ew) { - pm.setException(std::move(ew)); + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (auto e = t.template tryGetExceptionObject()) { + auto tf2 = state.tryInvoke(*e); + if (tf2.hasException()) { + state.setException(std::move(tf2.exception())); + } else { + tf2->setCallback_([p = state.stealPromise()](Try && t3) mutable { + p.setTry(std::move(t3)); + }); } - })) { - pm.setTry(std::move(t)); - } - }); + } else { + state.setTry(std::move(t)); + } + }); return f; } @@ -324,7 +732,7 @@ template template Future Future::ensure(F&& func) { return this->then([funcw = std::forward(func)](Try && t) mutable { - funcw(); + std::move(funcw)(); return makeFuture(std::move(t)); }); } @@ -333,42 +741,37 @@ template template Future Future::onTimeout(Duration dur, F&& func, Timekeeper* tk) { return within(dur, tk).onError([funcw = std::forward(func)]( - TimedOut const&) { return funcw(); }); + TimedOut const&) { return std::move(funcw)(); }); } template template -typename std::enable_if::value && - detail::Extract::ReturnsFuture::value, - Future>::type +typename std::enable_if< + futures::detail::callableWith::value && + futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { static_assert( - std::is_same::Return, Future>::value, + std::is_same::Return, Future>:: + value, "Return type of onError callback must be T or Future"); Promise p; auto f = p.getFuture(); - setCallback_( - [ pm = std::move(p), func = std::forward(func) ](Try t) mutable { + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try t) mutable { if (t.hasException()) { - auto ew = [&] { - try { - auto f2 = std::move(func)(std::move(t.exception())); - f2.setCallback_([pm = std::move(pm)](Try t2) mutable { - pm.setTry(std::move(t2)); - }); - return exception_wrapper(); - } catch (const std::exception& e2) { - return exception_wrapper(std::current_exception(), e2); - } catch (...) { - return exception_wrapper(std::current_exception()); - } - }(); - if (ew) { - pm.setException(std::move(ew)); + auto tf2 = state.tryInvoke(std::move(t.exception())); + if (tf2.hasException()) { + state.setException(std::move(tf2.exception())); + } else { + tf2->setCallback_([p = state.stealPromise()](Try && t3) mutable { + p.setTry(std::move(t3)); + }); } } else { - pm.setTry(std::move(t)); + state.setTry(std::move(t)); } }); @@ -379,111 +782,38 @@ Future::onError(F&& func) { template template typename std::enable_if< - detail::callableWith::value && - !detail::Extract::ReturnsFuture::value, - Future>::type + futures::detail::callableWith::value && + !futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { static_assert( - std::is_same::Return, Future>::value, + std::is_same::Return, Future>:: + value, "Return type of onError callback must be T or Future"); Promise p; auto f = p.getFuture(); - setCallback_( - [ pm = std::move(p), func = std::forward(func) ](Try t) mutable { + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { if (t.hasException()) { - pm.setWith([&] { return std::move(func)(std::move(t.exception())); }); + state.setTry(makeTryWith( + [&] { return state.invoke(std::move(t.exception())); })); } else { - pm.setTry(std::move(t)); + state.setTry(std::move(t)); } }); return f; } -template -typename std::add_lvalue_reference::type Future::value() { - throwIfInvalid(); - - return core_->getTry().value(); -} - -template -typename std::add_lvalue_reference::type Future::value() const { - throwIfInvalid(); - - return core_->getTry().value(); -} - -template -Try& Future::getTry() { - throwIfInvalid(); - - return core_->getTry(); -} - -template -Try& Future::getTryVia(DrivableExecutor* e) { - return waitVia(e).getTry(); -} - -template -Optional> Future::poll() { - Optional> o; - if (core_->ready()) { - o = std::move(core_->getTry()); - } - return o; -} - -template -inline Future Future::via(Executor* executor, int8_t priority) && { - throwIfInvalid(); - - setExecutor(executor, priority); - - return std::move(*this); -} - -template -inline Future Future::via(Executor* executor, int8_t priority) & { - throwIfInvalid(); - - Promise p; - auto f = p.getFuture(); - then([p = std::move(p)](Try && t) mutable { p.setTry(std::move(t)); }); - return std::move(f).via(executor, priority); -} - template auto via(Executor* x, Func&& func) - -> Future::Inner> -{ + -> Future()())>::Inner> { // TODO make this actually more performant. :-P #7260175 return via(x).then(std::forward(func)); } -template -bool Future::isReady() const { - throwIfInvalid(); - return core_->ready(); -} - -template -bool Future::hasValue() { - return getTry().hasValue(); -} - -template -bool Future::hasException() { - return getTry().hasException(); -} - -template -void Future::raise(exception_wrapper exception) { - core_->raise(std::move(exception)); -} - // makeFuture template @@ -491,8 +821,7 @@ Future::type> makeFuture(T&& t) { return makeFuture(Try::type>(std::forward(t))); } -inline // for multiple translation units -Future makeFuture() { +inline Future makeFuture() { return makeFuture(Unit{}); } @@ -504,7 +833,7 @@ makeFutureWith(F&& func) { using InnerType = typename isFuture::type>::Inner; try { - return func(); + return std::forward(func)(); } catch (std::exception& e) { return makeFuture( exception_wrapper(std::current_exception(), e)); @@ -518,13 +847,11 @@ makeFutureWith(F&& func) { template typename std::enable_if< !(isFuture::type>::value), - Future::type>::type>>::type + Future::type>>>::type makeFutureWith(F&& func) { - using LiftedResult = - typename Unit::Lift::type>::type; - return makeFuture(makeTryWith([&func]() mutable { - return func(); - })); + using LiftedResult = Unit::LiftT::type>; + return makeFuture( + makeTryWith([&func]() mutable { return std::forward(func)(); })); } template @@ -546,7 +873,7 @@ makeFuture(E const& e) { template Future makeFuture(Try&& t) { - return Future(new detail::Core(std::move(t))); + return Future(new futures::detail::Core(std::move(t))); } // via @@ -568,13 +895,13 @@ void mapSetCallback(InputIterator first, InputIterator last, F func) { // collectAll (variadic) template -typename detail::CollectAllVariadicContext< - typename std::decay::type::value_type...>::type +typename futures::detail::CollectAllVariadicContext< + typename std::decay::type::value_type...>::type collectAll(Fs&&... fs) { - auto ctx = std::make_shared::type::value_type...>>(); - detail::collectVariadicHelper( - ctx, std::forward::type>(fs)...); + auto ctx = std::make_shared::type::value_type...>>(); + futures::detail::collectVariadicHelper< + futures::detail::CollectAllVariadicContext>(ctx, std::forward(fs)...); return ctx->p.getFuture(); } @@ -607,6 +934,7 @@ collectAll(InputIterator first, InputIterator last) { // collect (iterator) +namespace futures { namespace detail { template @@ -645,17 +973,18 @@ struct CollectContext { std::atomic threw {false}; }; -} +} // namespace detail +} // namespace futures template -Future::value_type::value_type>::Result> +Future::value_type::value_type>::Result> collect(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - auto ctx = std::make_shared>( - std::distance(first, last)); + auto ctx = std::make_shared>( + std::distance(first, last)); mapSetCallback(first, last, [ctx](size_t i, Try&& t) { if (t.hasException()) { if (!ctx->threw.exchange(true)) { @@ -671,13 +1000,13 @@ collect(InputIterator first, InputIterator last) { // collect (variadic) template -typename detail::CollectVariadicContext< - typename std::decay::type::value_type...>::type +typename futures::detail::CollectVariadicContext< + typename std::decay::type::value_type...>::type collect(Fs&&... fs) { - auto ctx = std::make_shared::type::value_type...>>(); - detail::collectVariadicHelper( - ctx, std::forward::type>(fs)...); + auto ctx = std::make_shared::type::value_type...>>(); + futures::detail::collectVariadicHelper< + futures::detail::CollectVariadicContext>(ctx, std::forward(fs)...); return ctx->p.getFuture(); } @@ -786,10 +1115,10 @@ Future reduce(It first, It last, T&& initial, F&& func) { } typedef typename std::iterator_traits::value_type::value_type ItT; - typedef - typename std::conditional&&>::value, - Try, - ItT>::type Arg; + typedef typename std::conditional< + futures::detail::callableWith&&>::value, + Try, + ItT>::type Arg; typedef isTry IsTry; auto sfunc = std::make_shared(std::move(func)); @@ -817,27 +1146,38 @@ Future reduce(It first, It last, T&& initial, F&& func) { template std::vector> window(Collection input, F func, size_t n) { - struct WindowContext { - WindowContext(Collection&& i, F&& fn) - : input_(std::move(i)), promises_(input_.size()), - func_(std::move(fn)) - {} - std::atomic i_ {0}; - Collection input_; - std::vector> promises_; - F func_; + // Use global inline executor singleton + auto executor = &InlineExecutor::instance(); + return window(executor, std::move(input), std::move(func), n); +} - static inline void spawn(const std::shared_ptr& ctx) { - size_t i = ctx->i_++; - if (i < ctx->input_.size()) { - // Using setCallback_ directly since we don't need the Future - ctx->func_(std::move(ctx->input_[i])).setCallback_( - // ctx is captured by value - [ctx, i](Try&& t) { - ctx->promises_[i].setTry(std::move(t)); +template +std::vector> +window(Executor* executor, Collection input, F func, size_t n) { + struct WindowContext { + WindowContext(Executor* executor_, Collection&& input_, F&& func_) + : executor(executor_), + input(std::move(input_)), + promises(input.size()), + func(std::move(func_)) {} + std::atomic i{0}; + Executor* executor; + Collection input; + std::vector> promises; + F func; + + static inline void spawn(std::shared_ptr ctx) { + size_t i = ctx->i++; + if (i < ctx->input.size()) { + auto fut = ctx->func(std::move(ctx->input[i])); + fut.setCallback_([ctx = std::move(ctx), i](Try&& t) mutable { + const auto executor_ = ctx->executor; + executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable { + ctx->promises[i].setTry(std::move(t)); // Chain another future onto this one spawn(std::move(ctx)); }); + }); } } }; @@ -845,16 +1185,16 @@ window(Collection input, F func, size_t n) { auto max = std::min(n, input.size()); auto ctx = std::make_shared( - std::move(input), std::move(func)); + executor, std::move(input), std::move(func)); + // Start the first n Futures for (size_t i = 0; i < max; ++i) { - // Start the first n Futures - WindowContext::spawn(ctx); + executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); }); } std::vector> futures; - futures.reserve(ctx->promises_.size()); - for (auto& promise : ctx->promises_) { + futures.reserve(ctx->promises.size()); + for (auto& promise : ctx->promises) { futures.emplace_back(promise.getFuture()); } @@ -950,34 +1290,48 @@ Future Future::within(Duration dur, E e, Timekeeper* tk) { std::atomic token {false}; }; + if (this->isReady()) { + return std::move(*this); + } + std::shared_ptr tks; - if (!tk) { + if (LIKELY(!tk)) { tks = folly::detail::getTimekeeperSingleton(); - tk = DCHECK_NOTNULL(tks.get()); + tk = tks.get(); + } + + if (UNLIKELY(!tk)) { + return makeFuture(NoTimekeeper()); } auto ctx = std::make_shared(std::move(e)); ctx->thisFuture = this->then([ctx](Try&& t) mutable { - // TODO: "this" completed first, cancel "after" if (ctx->token.exchange(true) == false) { ctx->promise.setTry(std::move(t)); } }); - tk->after(dur).then([ctx](Try const& t) mutable { + // Have time keeper use a weak ptr to hold ctx, + // so that ctx can be deallocated as soon as the future job finished. + tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try const& t) mutable { + auto lockedCtx = weakCtx.lock(); + if (!lockedCtx) { + // ctx already released. "this" completed first, cancel "after" + return; + } // "after" completed first, cancel "this" - ctx->thisFuture.raise(TimedOut()); - if (ctx->token.exchange(true) == false) { + lockedCtx->thisFuture.raise(TimedOut()); + if (lockedCtx->token.exchange(true) == false) { if (t.hasException()) { - ctx->promise.setException(std::move(t.exception())); + lockedCtx->promise.setException(std::move(t.exception())); } else { - ctx->promise.setException(std::move(ctx->exception)); + lockedCtx->promise.setException(std::move(lockedCtx->exception)); } } }); - return ctx->promise.getFuture().via(getExecutor()); + return ctx->promise.getFuture().via(this->getExecutor()); } // delayed @@ -985,27 +1339,39 @@ Future Future::within(Duration dur, E e, Timekeeper* tk) { template Future Future::delayed(Duration dur, Timekeeper* tk) { return collectAll(*this, futures::sleep(dur, tk)) - .then([](std::tuple, Try> tup) { - Try& t = std::get<0>(tup); - return makeFuture(std::move(t)); - }); + .then([](std::tuple, Try> tup) { + Try& t = std::get<0>(tup); + return makeFuture(std::move(t)); + }); } +namespace futures { namespace detail { template -void waitImpl(Future& f) { +void doBoost(folly::Future& /* usused */) {} + +template +void doBoost(folly::SemiFuture& f) { + f.boost_(); +} + +template +void waitImpl(FutureType& f) { // short-circuit if there's nothing to do - if (f.isReady()) return; + if (f.isReady()) { + return; + } FutureBatonType baton; f.setCallback_([&](const Try& /* t */) { baton.post(); }); + doBoost(f); baton.wait(); assert(f.isReady()); } -template -void waitImpl(Future& f, Duration dur) { +template +void waitImpl(FutureType& f, Duration dur) { // short-circuit if there's nothing to do if (f.isReady()) { return; @@ -1014,12 +1380,13 @@ void waitImpl(Future& f, Duration dur) { Promise promise; auto ret = promise.getFuture(); auto baton = std::make_shared(); - f.setCallback_([ baton, promise = std::move(promise) ](Try && t) mutable { + f.setCallback_([baton, promise = std::move(promise)](Try&& t) mutable { promise.setTry(std::move(t)); baton->post(); }); + doBoost(f); f = std::move(ret); - if (baton->timed_wait(dur)) { + if (baton->try_wait_for(dur)) { assert(f.isReady()); } } @@ -1029,8 +1396,9 @@ void waitViaImpl(Future& f, DrivableExecutor* e) { // Set callback so to ensure that the via executor has something on it // so that once the preceding future triggers this callback, drive will // always have a callback to satisfy it - if (f.isReady()) + if (f.isReady()) { return; + } f = f.via(e).then([](T&& t) { return std::move(t); }); while (!f.isReady()) { e->drive(); @@ -1038,41 +1406,133 @@ void waitViaImpl(Future& f, DrivableExecutor* e) { assert(f.isReady()); } -} // detail +template +void waitViaImpl(SemiFuture& f, DrivableExecutor* e) { + // Set callback so to ensure that the via executor has something on it + // so that once the preceding future triggers this callback, drive will + // always have a callback to satisfy it + if (f.isReady()) { + return; + } + f = std::move(f).via(e).then([](T&& t) { return std::move(t); }); + while (!f.isReady()) { + e->drive(); + } + assert(f.isReady()); +} + +} // namespace detail +} // namespace futures + +template +SemiFuture& SemiFuture::wait() & { + futures::detail::waitImpl(*this); + return *this; +} + +template +SemiFuture&& SemiFuture::wait() && { + futures::detail::waitImpl(*this); + return std::move(*this); +} + +template +SemiFuture& SemiFuture::wait(Duration dur) & { + futures::detail::waitImpl(*this, dur); + return *this; +} + +template +SemiFuture&& SemiFuture::wait(Duration dur) && { + futures::detail::waitImpl(*this, dur); + return std::move(*this); +} + +template +SemiFuture& SemiFuture::waitVia(DrivableExecutor* e) & { + futures::detail::waitViaImpl(*this, e); + return *this; +} + +template +SemiFuture&& SemiFuture::waitVia(DrivableExecutor* e) && { + futures::detail::waitViaImpl(*this, e); + return std::move(*this); +} + +template +T SemiFuture::get() && { + return std::move(wait()).value(); +} + +template +T SemiFuture::get(Duration dur) && { + wait(dur); + if (this->isReady()) { + return std::move(this->value()); + } else { + throwTimedOut(); + } +} + +template +Try SemiFuture::getTry() && { + return std::move(wait()).result(); +} + +template +Try SemiFuture::getTry(Duration dur) && { + wait(dur); + if (this->isReady()) { + return std::move(this->result()); + } else { + throwTimedOut(); + } +} + +template +T SemiFuture::getVia(DrivableExecutor* e) && { + return std::move(waitVia(e)).value(); +} + +template +Try SemiFuture::getTryVia(DrivableExecutor* e) && { + return std::move(waitVia(e)).result(); +} template Future& Future::wait() & { - detail::waitImpl(*this); + futures::detail::waitImpl(*this); return *this; } template Future&& Future::wait() && { - detail::waitImpl(*this); + futures::detail::waitImpl(*this); return std::move(*this); } template Future& Future::wait(Duration dur) & { - detail::waitImpl(*this, dur); + futures::detail::waitImpl(*this, dur); return *this; } template Future&& Future::wait(Duration dur) && { - detail::waitImpl(*this, dur); + futures::detail::waitImpl(*this, dur); return std::move(*this); } template Future& Future::waitVia(DrivableExecutor* e) & { - detail::waitViaImpl(*this, e); + futures::detail::waitViaImpl(*this, e); return *this; } template Future&& Future::waitVia(DrivableExecutor* e) && { - detail::waitViaImpl(*this, e); + futures::detail::waitViaImpl(*this, e); return std::move(*this); } @@ -1084,32 +1544,45 @@ T Future::get() { template T Future::get(Duration dur) { wait(dur); - if (isReady()) { - return std::move(value()); + if (this->isReady()) { + return std::move(this->value()); } else { - throw TimedOut(); + throwTimedOut(); } } +template +Try& Future::getTry() { + return result(); +} + template T Future::getVia(DrivableExecutor* e) { return std::move(waitVia(e).value()); } -namespace detail { - template - struct TryEquals { - static bool equals(const Try& t1, const Try& t2) { - return t1.value() == t2.value(); - } - }; +template +Try& Future::getTryVia(DrivableExecutor* e) { + return waitVia(e).getTry(); } +namespace futures { +namespace detail { +template +struct TryEquals { + static bool equals(const Try& t1, const Try& t2) { + return t1.value() == t2.value(); + } +}; +} // namespace detail +} // namespace futures + template Future Future::willEqual(Future& f) { return collectAll(*this, f).then([](const std::tuple, Try>& t) { if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) { - return detail::TryEquals::equals(std::get<0>(t), std::get<1>(t)); + return futures::detail::TryEquals::equals( + std::get<0>(t), std::get<1>(t)); } else { return false; } @@ -1122,52 +1595,12 @@ Future Future::filter(F&& predicate) { return this->then([p = std::forward(predicate)](T val) { T const& valConstRef = val; if (!p(valConstRef)) { - throw PredicateDoesNotObtain(); + throwPredicateDoesNotObtain(); } return val; }); } -template -template -auto Future::thenMulti(Callback&& fn) - -> decltype(this->then(std::forward(fn))) { - // thenMulti with one callback is just a then - return then(std::forward(fn)); -} - -template -template -auto Future::thenMulti(Callback&& fn, Callbacks&&... fns) - -> decltype(this->then(std::forward(fn)). - thenMulti(std::forward(fns)...)) { - // thenMulti with two callbacks is just then(a).thenMulti(b, ...) - return then(std::forward(fn)). - thenMulti(std::forward(fns)...); -} - -template -template -auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn, - Callbacks&&... fns) - -> decltype(this->then(std::forward(fn)). - thenMulti(std::forward(fns)...)) { - // thenMultiExecutor with two callbacks is - // via(x).then(a).thenMulti(b, ...).via(oldX) - auto oldX = getExecutor(); - setExecutor(x); - return then(std::forward(fn)). - thenMulti(std::forward(fns)...).via(oldX); -} - -template -template -auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn) - -> decltype(this->then(std::forward(fn))) { - // thenMulti with one callback is just a then with an executor - return then(x, std::forward(fn)); -} - template inline Future when(bool p, F&& thunk) { return p ? std::forward(thunk)().unit() : makeFuture(); @@ -1197,241 +1630,15 @@ Future times(const int n, F&& thunk) { } namespace futures { - template - std::vector> map(It first, It last, F func) { - std::vector> results; - for (auto it = first; it != last; it++) { - results.push_back(it->then(func)); - } - return results; +template +std::vector> map(It first, It last, F func) { + std::vector> results; + for (auto it = first; it != last; it++) { + results.push_back(it->then(func)); } + return results; } - -namespace futures { - -namespace detail { - -struct retrying_policy_raw_tag {}; -struct retrying_policy_fut_tag {}; - -template -struct retrying_policy_traits { - using ew = exception_wrapper; - FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator()); - template - using has_op = typename std::integral_constant::value || - has_op_call::value>; - using is_raw = has_op; - using is_fut = has_op>; - using tag = typename std::conditional< - is_raw::value, retrying_policy_raw_tag, typename std::conditional< - is_fut::value, retrying_policy_fut_tag, void>::type>::type; -}; - -template -void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) { - using F = typename std::result_of::type; - using T = typename F::value_type; - auto f = makeFutureWith([&] { return ff(k++); }); - f.then([ - k, - prom = std::move(prom), - pm = std::forward(p), - ffm = std::forward(ff) - ](Try && t) mutable { - if (t.hasValue()) { - prom.setValue(std::move(t).value()); - return; - } - auto& x = t.exception(); - auto q = pm(k, x); - q.then([ - k, - prom = std::move(prom), - xm = std::move(x), - pm = std::move(pm), - ffm = std::move(ffm) - ](bool shouldRetry) mutable { - if (shouldRetry) { - retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom)); - } else { - prom.setException(std::move(xm)); - }; - }); - }); -} - -template -typename std::result_of::type -retrying(size_t k, Policy&& p, FF&& ff) { - using F = typename std::result_of::type; - using T = typename F::value_type; - auto prom = Promise(); - auto f = prom.getFuture(); - retryingImpl( - k, std::forward(p), std::forward(ff), std::move(prom)); - return f; -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { - auto q = [pm = std::forward(p)](size_t k, exception_wrapper x) { - return makeFuture(pm(k, x)); - }; - return retrying(0, std::move(q), std::forward(ff)); -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { - return retrying(0, std::forward(p), std::forward(ff)); -} - -// jittered exponential backoff, clamped to [backoff_min, backoff_max] -template -Duration retryingJitteredExponentialBackoffDur( - size_t n, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG& rng) { - using d = Duration; - auto dist = std::normal_distribution(0.0, jitter_param); - auto jitter = std::exp(dist(rng)); - auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); - return std::max(backoff_min, std::min(backoff_max, backoff)); -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p) { - return [ - pm = std::forward(p), - max_tries, - backoff_min, - backoff_max, - jitter_param, - rngp = std::forward(rng) - ](size_t n, const exception_wrapper& ex) mutable { - if (n == max_tries) { - return makeFuture(false); - } - return pm(n, ex).then( - [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ]( - bool v) mutable { - if (!v) { - return makeFuture(false); - } - auto backoff = detail::retryingJitteredExponentialBackoffDur( - n, backoff_min, backoff_max, jitter_param, rngp); - return futures::sleep(backoff).then([] { return true; }); - }); - }; -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p, - retrying_policy_raw_tag) { - auto q = [pm = std::forward(p)]( - size_t n, const exception_wrapper& e) { - return makeFuture(pm(n, e)); - }; - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::move(q)); -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p, - retrying_policy_fut_tag) { - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::forward(p)); -} -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff) { - using tag = typename detail::retrying_policy_traits::tag; - return detail::retrying(std::forward(p), std::forward(ff), tag()); -} - -inline -std::function -retryingPolicyBasic( - size_t max_tries) { - return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p) { - using tag = typename detail::retrying_policy_traits::tag; - return detail::retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::forward(p), - tag()); -} - -inline -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param) { - auto p = [](size_t, const exception_wrapper&) { return true; }; - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - ThreadLocalPRNG(), - std::move(p)); -} - -} +} // namespace futures // Instantiate the most common Future types to save compile time extern template class Future; @@ -1440,5 +1647,4 @@ extern template class Future; extern template class Future; extern template class Future; extern template class Future; - } // namespace folly