X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2FFuture-inl.h;h=70a2c581ee6d228de585a52ed4064480df7d4c37;hb=7ebe7c2a249de44209e8bcc453e1a70bafd4ed19;hp=e1e34956a6aa3cffc96d75072a80aa9b60e038cb;hpb=0be5c0a490a250b05590c62a9c70b6f654bfb546;p=folly.git diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index e1e34956..70a2c581 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -19,37 +19,43 @@ #include #include #include -#include #include + #include #include -#include -#include -#include +#include #include +#include +#ifndef FOLLY_FUTURE_USING_FIBER #if FOLLY_MOBILE || defined(__APPLE__) #define FOLLY_FUTURE_USING_FIBER 0 #else #define FOLLY_FUTURE_USING_FIBER 1 #include #endif +#endif namespace folly { class Timekeeper; +namespace futures { namespace detail { #if FOLLY_FUTURE_USING_FIBER typedef folly::fibers::Baton FutureBatonType; #else typedef folly::Baton<> FutureBatonType; #endif -} +} // namespace detail +} // namespace futures namespace detail { std::shared_ptr getTimekeeperSingleton(); +} // namespace detail +namespace futures { +namespace detail { // Guarantees that the stored functor is destructed before the stored promise // may be fulfilled. Assumes the stored functor to be noexcept-destructible. template @@ -85,6 +91,11 @@ class CoreCallbackState { return std::move(func_)(std::forward(args)...); } + template + auto tryInvoke(Args&&... args) noexcept { + return makeTryWith([&] { return invoke(std::forward(args)...); }); + } + void setTry(Try&& t) { stealPromise().setTry(std::move(t)); } @@ -107,7 +118,7 @@ class CoreCallbackState { union { F func_; }; - Promise promise_{detail::EmptyConstruct{}}; + Promise promise_{Promise::makeEmpty()}; }; template @@ -118,36 +129,196 @@ inline auto makeCoreCallbackState(Promise&& p, F&& f) noexcept( return CoreCallbackState>>( std::move(p), std::forward(f)); } +} // namespace detail +} // namespace futures + +template +SemiFuture::type> makeSemiFuture(T&& t) { + return makeSemiFuture(Try::type>(std::forward(t))); +} + +// makeSemiFutureWith(SemiFuture()) -> SemiFuture +template +typename std::enable_if< + isSemiFuture::type>::value, + typename std::result_of::type>::type +makeSemiFutureWith(F&& func) { + using InnerType = + typename isSemiFuture::type>::Inner; + try { + return std::forward(func)(); + } catch (std::exception& e) { + return makeSemiFuture( + exception_wrapper(std::current_exception(), e)); + } catch (...) { + return makeSemiFuture( + exception_wrapper(std::current_exception())); + } +} + +// makeSemiFutureWith(T()) -> SemiFuture +// makeSemiFutureWith(void()) -> SemiFuture +template +typename std::enable_if< + !(isSemiFuture::type>::value), + SemiFuture::type>>>::type +makeSemiFutureWith(F&& func) { + using LiftedResult = Unit::LiftT::type>; + return makeSemiFuture( + makeTryWith([&func]() mutable { return std::forward(func)(); })); +} + +template +SemiFuture makeSemiFuture(std::exception_ptr const& e) { + return makeSemiFuture(Try(e)); +} + +template +SemiFuture makeSemiFuture(exception_wrapper ew) { + return makeSemiFuture(Try(std::move(ew))); +} + +template +typename std:: + enable_if::value, SemiFuture>::type + makeSemiFuture(E const& e) { + return makeSemiFuture(Try(make_exception_wrapper(e))); +} + +template +SemiFuture makeSemiFuture(Try&& t) { + return SemiFuture(new futures::detail::Core(std::move(t))); } template -Future::Future(Future&& other) noexcept : core_(other.core_) { +SemiFuture SemiFuture::makeEmpty() { + return SemiFuture(futures::detail::EmptyConstruct{}); +} + +template +SemiFuture::SemiFuture(SemiFuture&& other) noexcept : core_(other.core_) { other.core_ = nullptr; } template -Future& Future::operator=(Future&& other) noexcept { +SemiFuture& SemiFuture::operator=(SemiFuture&& other) noexcept { + std::swap(core_, other.core_); + return *this; +} + +template +SemiFuture::SemiFuture(Future&& other) noexcept : core_(other.core_) { + other.core_ = nullptr; +} + +template +SemiFuture& SemiFuture::operator=(Future&& other) noexcept { std::swap(core_, other.core_); return *this; } template template -Future::Future(T2&& val) - : core_(new detail::Core(Try(std::forward(val)))) {} +SemiFuture::SemiFuture(T2&& val) + : core_(new futures::detail::Core(Try(std::forward(val)))) {} template template -Future::Future(typename std::enable_if::value>::type*) - : core_(new detail::Core(Try(T()))) {} +SemiFuture::SemiFuture( + typename std::enable_if::value>::type*) + : core_(new futures::detail::Core(Try(T()))) {} template -Future::~Future() { +template < + class... Args, + typename std::enable_if::value, int>:: + type> +SemiFuture::SemiFuture(in_place_t, Args&&... args) + : core_( + new futures::detail::Core(in_place, std::forward(args)...)) { +} + +template +SemiFuture::~SemiFuture() { detach(); } +// This must be defined after the constructors to avoid a bug in MSVC +// https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244 +inline SemiFuture makeSemiFuture() { + return makeSemiFuture(Unit{}); +} + +template +T& SemiFuture::value() & { + throwIfInvalid(); + + return core_->getTry().value(); +} + +template +T const& SemiFuture::value() const& { + throwIfInvalid(); + + return core_->getTry().value(); +} + +template +T&& SemiFuture::value() && { + throwIfInvalid(); + + return std::move(core_->getTry().value()); +} + +template +T const&& SemiFuture::value() const&& { + throwIfInvalid(); + + return std::move(core_->getTry().value()); +} + +template +inline Future SemiFuture::via(Executor* executor, int8_t priority) && { + throwIfInvalid(); + + setExecutor(executor, priority); + + auto newFuture = Future(core_); + core_ = nullptr; + return newFuture; +} + +template +inline Future SemiFuture::via(Executor* executor, int8_t priority) & { + throwIfInvalid(); + Promise p; + auto f = p.getFuture(); + auto func = [p = std::move(p)](Try&& t) mutable { + p.setTry(std::move(t)); + }; + using R = futures::detail::callableResult; + thenImplementation(std::move(func), typename R::Arg()); + return std::move(f).via(executor, priority); +} + +template +bool SemiFuture::isReady() const { + throwIfInvalid(); + return core_->ready(); +} + +template +bool SemiFuture::hasValue() { + return getTry().hasValue(); +} + +template +bool SemiFuture::hasException() { + return getTry().hasException(); +} + template -void Future::detach() { +void SemiFuture::detach() { if (core_) { core_->detachFuture(); core_ = nullptr; @@ -155,18 +326,115 @@ void Future::detach() { } template -void Future::throwIfInvalid() const { - if (!core_) - throw NoState(); +Try& SemiFuture::getTry() { + throwIfInvalid(); + + return core_->getTry(); +} + +template +void SemiFuture::throwIfInvalid() const { + if (!core_) { + throwNoState(); +} +} + +template +Optional> SemiFuture::poll() { + Optional> o; + if (core_->ready()) { + o = std::move(core_->getTry()); + } + return o; +} + +template +void SemiFuture::raise(exception_wrapper exception) { + core_->raise(std::move(exception)); } template template -void Future::setCallback_(F&& func) { +void SemiFuture::setCallback_(F&& func) { throwIfInvalid(); core_->setCallback(std::forward(func)); } +template +SemiFuture::SemiFuture(futures::detail::EmptyConstruct) noexcept + : core_(nullptr) {} + +template +Future Future::makeEmpty() { + return Future(futures::detail::EmptyConstruct{}); +} + +template +Future::Future(Future&& other) noexcept + : SemiFuture(std::move(other)) {} + +template +Future& Future::operator=(Future&& other) noexcept { + SemiFuture::operator=(SemiFuture{std::move(other)}); + return *this; +} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value && + std::is_convertible::value, + int>::type> +Future::Future(Future&& other) + : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value && + !std::is_convertible::value, + int>::type> +Future::Future(Future&& other) + : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value, + int>::type> +Future& Future::operator=(Future&& other) { + return operator=( + std::move(other).then([](T2&& v) { return T(std::move(v)); })); +} + +// TODO: isSemiFuture +template +template +Future::Future(T2&& val) : SemiFuture(std::forward(val)) {} + +template +template +Future::Future(typename std::enable_if::value>::type*) + : SemiFuture() {} + +template +template < + class... Args, + typename std::enable_if::value, int>:: + type> +Future::Future(in_place_t, Args&&... args) + : SemiFuture(in_place, std::forward(args)...) {} + +template +Future::~Future() { +} + // unwrap template @@ -186,18 +454,20 @@ Future::unwrap() { template template typename std::enable_if::type -Future::thenImplementation(F&& func, detail::argResult) { +SemiFuture::thenImplementation( + F&& func, + futures::detail::argResult) { static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument"); typedef typename R::ReturnsFuture::Inner B; - throwIfInvalid(); + this->throwIfInvalid(); Promise p; - p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); + p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler()); // grab the Future now before we lose our handle on the Promise auto f = p.getFuture(); - f.core_->setExecutorNoLock(getExecutor()); + f.core_->setExecutorNoLock(this->getExecutor()); /* This is a bit tricky. @@ -218,19 +488,20 @@ Future::thenImplementation(F&& func, detail::argResult) { persist beyond the callback, if it gets moved), and so it is an optimization to just make it shared from the get-go. - Two subtle but important points about this design. detail::Core has no - back pointers to Future or Promise, so if Future or Promise get moved - (and they will be moved in performant code) we don't have to do + Two subtle but important points about this design. futures::detail::Core + has no back pointers to Future or Promise, so if Future or Promise get + moved (and they will be moved in performant code) we don't have to do anything fancy. And because we store the continuation in the - detail::Core, not in the Future, we can execute the continuation even - after the Future has gone out of scope. This is an intentional design + futures::detail::Core, not in the Future, we can execute the continuation + even after the Future has gone out of scope. This is an intentional design decision. It is likely we will want to be able to cancel a continuation in some circumstances, but I think it should be explicit not implicit in the destruction of the Future used to create it. */ - setCallback_( - [state = detail::makeCoreCallbackState( - std::move(p), std::forward(func))](Try && t) mutable { + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (!isTry && t.hasException()) { state.setException(std::move(t.exception())); } else { @@ -238,7 +509,6 @@ Future::thenImplementation(F&& func, detail::argResult) { [&] { return state.invoke(t.template get()...); })); } }); - return f; } @@ -247,42 +517,34 @@ Future::thenImplementation(F&& func, detail::argResult) { template template typename std::enable_if::type -Future::thenImplementation(F&& func, detail::argResult) { +SemiFuture::thenImplementation( + F&& func, + futures::detail::argResult) { static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument"); typedef typename R::ReturnsFuture::Inner B; - - throwIfInvalid(); + this->throwIfInvalid(); Promise p; - p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); + p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler()); // grab the Future now before we lose our handle on the Promise auto f = p.getFuture(); - f.core_->setExecutorNoLock(getExecutor()); - - setCallback_( - [state = detail::makeCoreCallbackState( - std::move(p), std::forward(func))](Try && t) mutable { - auto ew = [&] { - if (!isTry && t.hasException()) { - return std::move(t.exception()); + f.core_->setExecutorNoLock(this->getExecutor()); + + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (!isTry && t.hasException()) { + state.setException(std::move(t.exception())); + } else { + auto tf2 = state.tryInvoke(t.template get()...); + if (tf2.hasException()) { + state.setException(std::move(tf2.exception())); } else { - try { - auto f2 = state.invoke(t.template get()...); - // that didn't throw, now we can steal p - f2.setCallback_([p = state.stealPromise()](Try && b) mutable { - p.setTry(std::move(b)); - }); - return exception_wrapper(); - } catch (const std::exception& e) { - return exception_wrapper(std::current_exception(), e); - } catch (...) { - return exception_wrapper(std::current_exception()); - } + tf2->setCallback_([p = state.stealPromise()](Try && b) mutable { + p.setTry(std::move(b)); + }); } - }(); - if (ew) { - state.setException(std::move(ew)); } }); @@ -293,26 +555,15 @@ template template Future::Inner> Future::then(R(Caller::*func)(Args...), Caller *instance) { - typedef typename std::remove_cv< - typename std::remove_reference< - typename detail::ArgType::FirstArg>::type>::type FirstArg; + typedef typename std::remove_cv::FirstArg>::type>::type + FirstArg; + return then([instance, func](Try&& t){ return (instance->*func)(t.template get::value, Args>()...); }); } -template -template -auto Future::then(Executor* x, Arg&& arg, Args&&... args) - -> decltype(this->then(std::forward(arg), - std::forward(args)...)) -{ - auto oldX = getExecutor(); - setExecutor(x); - return this->then(std::forward(arg), std::forward(args)...). - via(oldX); -} - template Future Future::then() { return then([] () {}); @@ -322,25 +573,28 @@ Future Future::then() { template template typename std::enable_if< - !detail::callableWith::value && - !detail::Extract::ReturnsFuture::value, - Future>::type + !futures::detail::callableWith::value && + !futures::detail::callableWith::value && + !futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { - typedef typename detail::Extract::FirstArg Exn; + typedef std::remove_reference_t< + typename futures::detail::Extract::FirstArg> + Exn; static_assert( - std::is_same::RawReturn, T>::value, + std::is_same::RawReturn, T>::value, "Return type of onError callback must be T or Future"); Promise p; - p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); + p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler()); auto f = p.getFuture(); - setCallback_( - [state = detail::makeCoreCallbackState( - std::move(p), std::forward(func))](Try && t) mutable { - if (!t.template withException([&](Exn& e) { - state.setTry(makeTryWith([&] { return state.invoke(e); })); - })) { + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (auto e = t.template tryGetExceptionObject()) { + state.setTry(makeTryWith([&] { return state.invoke(*e); })); + } else { state.setTry(std::move(t)); } }); @@ -352,38 +606,35 @@ Future::onError(F&& func) { template template typename std::enable_if< - !detail::callableWith::value && - detail::Extract::ReturnsFuture::value, - Future>::type + !futures::detail::callableWith::value && + !futures::detail::callableWith::value && + futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { static_assert( - std::is_same::Return, Future>::value, + std::is_same::Return, Future>:: + value, "Return type of onError callback must be T or Future"); - typedef typename detail::Extract::FirstArg Exn; + typedef std::remove_reference_t< + typename futures::detail::Extract::FirstArg> + Exn; Promise p; auto f = p.getFuture(); - setCallback_( - [state = detail::makeCoreCallbackState( - std::move(p), std::forward(func))](Try && t) mutable { - if (!t.template withException([&](Exn& e) { - auto ew = [&] { - try { - auto f2 = state.invoke(e); - f2.setCallback_([p = state.stealPromise()]( - Try && t2) mutable { p.setTry(std::move(t2)); }); - return exception_wrapper(); - } catch (const std::exception& e2) { - return exception_wrapper(std::current_exception(), e2); - } catch (...) { - return exception_wrapper(std::current_exception()); - } - }(); - if (ew) { - state.setException(std::move(ew)); - } - })) { + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (auto e = t.template tryGetExceptionObject()) { + auto tf2 = state.tryInvoke(*e); + if (tf2.hasException()) { + state.setException(std::move(tf2.exception())); + } else { + tf2->setCallback_([p = state.stealPromise()](Try && t3) mutable { + p.setTry(std::move(t3)); + }); + } + } else { state.setTry(std::move(t)); } }); @@ -395,7 +646,7 @@ template template Future Future::ensure(F&& func) { return this->then([funcw = std::forward(func)](Try && t) mutable { - funcw(); + std::move(funcw)(); return makeFuture(std::move(t)); }); } @@ -404,40 +655,34 @@ template template Future Future::onTimeout(Duration dur, F&& func, Timekeeper* tk) { return within(dur, tk).onError([funcw = std::forward(func)]( - TimedOut const&) { return funcw(); }); + TimedOut const&) { return std::move(funcw)(); }); } template template -typename std::enable_if::value && - detail::Extract::ReturnsFuture::value, - Future>::type +typename std::enable_if< + futures::detail::callableWith::value && + futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { static_assert( - std::is_same::Return, Future>::value, + std::is_same::Return, Future>:: + value, "Return type of onError callback must be T or Future"); Promise p; auto f = p.getFuture(); - setCallback_( - [state = detail::makeCoreCallbackState( + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( std::move(p), std::forward(func))](Try t) mutable { if (t.hasException()) { - auto ew = [&] { - try { - auto f2 = state.invoke(std::move(t.exception())); - f2.setCallback_([p = state.stealPromise()](Try t2) mutable { - p.setTry(std::move(t2)); - }); - return exception_wrapper(); - } catch (const std::exception& e2) { - return exception_wrapper(std::current_exception(), e2); - } catch (...) { - return exception_wrapper(std::current_exception()); - } - }(); - if (ew) { - state.setException(std::move(ew)); + auto tf2 = state.tryInvoke(std::move(t.exception())); + if (tf2.hasException()) { + state.setException(std::move(tf2.exception())); + } else { + tf2->setCallback_([p = state.stealPromise()](Try && t3) mutable { + p.setTry(std::move(t3)); + }); } } else { state.setTry(std::move(t)); @@ -451,19 +696,20 @@ Future::onError(F&& func) { template template typename std::enable_if< - detail::callableWith::value && - !detail::Extract::ReturnsFuture::value, - Future>::type + futures::detail::callableWith::value && + !futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { static_assert( - std::is_same::Return, Future>::value, + std::is_same::Return, Future>:: + value, "Return type of onError callback must be T or Future"); Promise p; auto f = p.getFuture(); - setCallback_( - [state = detail::makeCoreCallbackState( - std::move(p), std::forward(func))](Try t) mutable { + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { if (t.hasException()) { state.setTry(makeTryWith( [&] { return state.invoke(std::move(t.exception())); })); @@ -475,88 +721,21 @@ Future::onError(F&& func) { return f; } -template -typename std::add_lvalue_reference::type Future::value() { - throwIfInvalid(); - - return core_->getTry().value(); -} - -template -typename std::add_lvalue_reference::type Future::value() const { - throwIfInvalid(); - - return core_->getTry().value(); -} - -template -Try& Future::getTry() { - throwIfInvalid(); - - return core_->getTry(); -} - template Try& Future::getTryVia(DrivableExecutor* e) { return waitVia(e).getTry(); } -template -Optional> Future::poll() { - Optional> o; - if (core_->ready()) { - o = std::move(core_->getTry()); - } - return o; -} - -template -inline Future Future::via(Executor* executor, int8_t priority) && { - throwIfInvalid(); - - setExecutor(executor, priority); - - return std::move(*this); -} - -template -inline Future Future::via(Executor* executor, int8_t priority) & { - throwIfInvalid(); - - Promise p; - auto f = p.getFuture(); - then([p = std::move(p)](Try && t) mutable { p.setTry(std::move(t)); }); - return std::move(f).via(executor, priority); -} - template auto via(Executor* x, Func&& func) - -> Future::Inner> -{ + -> Future()())>::Inner> { // TODO make this actually more performant. :-P #7260175 return via(x).then(std::forward(func)); } template -bool Future::isReady() const { - throwIfInvalid(); - return core_->ready(); -} - -template -bool Future::hasValue() { - return getTry().hasValue(); -} - -template -bool Future::hasException() { - return getTry().hasException(); -} - -template -void Future::raise(exception_wrapper exception) { - core_->raise(std::move(exception)); -} +Future::Future(futures::detail::EmptyConstruct) noexcept + : SemiFuture(futures::detail::EmptyConstruct{}) {} // makeFuture @@ -565,8 +744,7 @@ Future::type> makeFuture(T&& t) { return makeFuture(Try::type>(std::forward(t))); } -inline // for multiple translation units -Future makeFuture() { +inline Future makeFuture() { return makeFuture(Unit{}); } @@ -578,7 +756,7 @@ makeFutureWith(F&& func) { using InnerType = typename isFuture::type>::Inner; try { - return func(); + return std::forward(func)(); } catch (std::exception& e) { return makeFuture( exception_wrapper(std::current_exception(), e)); @@ -592,13 +770,11 @@ makeFutureWith(F&& func) { template typename std::enable_if< !(isFuture::type>::value), - Future::type>::type>>::type + Future::type>>>::type makeFutureWith(F&& func) { - using LiftedResult = - typename Unit::Lift::type>::type; - return makeFuture(makeTryWith([&func]() mutable { - return func(); - })); + using LiftedResult = Unit::LiftT::type>; + return makeFuture( + makeTryWith([&func]() mutable { return std::forward(func)(); })); } template @@ -620,7 +796,7 @@ makeFuture(E const& e) { template Future makeFuture(Try&& t) { - return Future(new detail::Core(std::move(t))); + return Future(new futures::detail::Core(std::move(t))); } // via @@ -642,13 +818,13 @@ void mapSetCallback(InputIterator first, InputIterator last, F func) { // collectAll (variadic) template -typename detail::CollectAllVariadicContext< - typename std::decay::type::value_type...>::type +typename futures::detail::CollectAllVariadicContext< + typename std::decay::type::value_type...>::type collectAll(Fs&&... fs) { - auto ctx = std::make_shared::type::value_type...>>(); - detail::collectVariadicHelper( - ctx, std::forward::type>(fs)...); + auto ctx = std::make_shared::type::value_type...>>(); + futures::detail::collectVariadicHelper< + futures::detail::CollectAllVariadicContext>(ctx, std::forward(fs)...); return ctx->p.getFuture(); } @@ -681,6 +857,7 @@ collectAll(InputIterator first, InputIterator last) { // collect (iterator) +namespace futures { namespace detail { template @@ -719,17 +896,18 @@ struct CollectContext { std::atomic threw {false}; }; -} +} // namespace detail +} // namespace futures template -Future::value_type::value_type>::Result> +Future::value_type::value_type>::Result> collect(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - auto ctx = std::make_shared>( - std::distance(first, last)); + auto ctx = std::make_shared>( + std::distance(first, last)); mapSetCallback(first, last, [ctx](size_t i, Try&& t) { if (t.hasException()) { if (!ctx->threw.exchange(true)) { @@ -745,13 +923,13 @@ collect(InputIterator first, InputIterator last) { // collect (variadic) template -typename detail::CollectVariadicContext< - typename std::decay::type::value_type...>::type +typename futures::detail::CollectVariadicContext< + typename std::decay::type::value_type...>::type collect(Fs&&... fs) { - auto ctx = std::make_shared::type::value_type...>>(); - detail::collectVariadicHelper( - ctx, std::forward::type>(fs)...); + auto ctx = std::make_shared::type::value_type...>>(); + futures::detail::collectVariadicHelper< + futures::detail::CollectVariadicContext>(ctx, std::forward(fs)...); return ctx->p.getFuture(); } @@ -860,10 +1038,10 @@ Future reduce(It first, It last, T&& initial, F&& func) { } typedef typename std::iterator_traits::value_type::value_type ItT; - typedef - typename std::conditional&&>::value, - Try, - ItT>::type Arg; + typedef typename std::conditional< + futures::detail::callableWith&&>::value, + Try, + ItT>::type Arg; typedef isTry IsTry; auto sfunc = std::make_shared(std::move(func)); @@ -891,27 +1069,38 @@ Future reduce(It first, It last, T&& initial, F&& func) { template std::vector> window(Collection input, F func, size_t n) { - struct WindowContext { - WindowContext(Collection&& i, F&& fn) - : input_(std::move(i)), promises_(input_.size()), - func_(std::move(fn)) - {} - std::atomic i_ {0}; - Collection input_; - std::vector> promises_; - F func_; + // Use global inline executor singleton + auto executor = &InlineExecutor::instance(); + return window(executor, std::move(input), std::move(func), n); +} - static inline void spawn(const std::shared_ptr& ctx) { - size_t i = ctx->i_++; - if (i < ctx->input_.size()) { - // Using setCallback_ directly since we don't need the Future - ctx->func_(std::move(ctx->input_[i])).setCallback_( - // ctx is captured by value - [ctx, i](Try&& t) { - ctx->promises_[i].setTry(std::move(t)); +template +std::vector> +window(Executor* executor, Collection input, F func, size_t n) { + struct WindowContext { + WindowContext(Executor* executor_, Collection&& input_, F&& func_) + : executor(executor_), + input(std::move(input_)), + promises(input.size()), + func(std::move(func_)) {} + std::atomic i{0}; + Executor* executor; + Collection input; + std::vector> promises; + F func; + + static inline void spawn(std::shared_ptr ctx) { + size_t i = ctx->i++; + if (i < ctx->input.size()) { + auto fut = ctx->func(std::move(ctx->input[i])); + fut.setCallback_([ctx = std::move(ctx), i](Try&& t) mutable { + const auto executor_ = ctx->executor; + executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable { + ctx->promises[i].setTry(std::move(t)); // Chain another future onto this one spawn(std::move(ctx)); }); + }); } } }; @@ -919,16 +1108,16 @@ window(Collection input, F func, size_t n) { auto max = std::min(n, input.size()); auto ctx = std::make_shared( - std::move(input), std::move(func)); + executor, std::move(input), std::move(func)); + // Start the first n Futures for (size_t i = 0; i < max; ++i) { - // Start the first n Futures - WindowContext::spawn(ctx); + executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); }); } std::vector> futures; - futures.reserve(ctx->promises_.size()); - for (auto& promise : ctx->promises_) { + futures.reserve(ctx->promises.size()); + for (auto& promise : ctx->promises) { futures.emplace_back(promise.getFuture()); } @@ -1024,6 +1213,10 @@ Future Future::within(Duration dur, E e, Timekeeper* tk) { std::atomic token {false}; }; + if (this->isReady()) { + return std::move(*this); + } + std::shared_ptr tks; if (!tk) { tks = folly::detail::getTimekeeperSingleton(); @@ -1033,25 +1226,31 @@ Future Future::within(Duration dur, E e, Timekeeper* tk) { auto ctx = std::make_shared(std::move(e)); ctx->thisFuture = this->then([ctx](Try&& t) mutable { - // TODO: "this" completed first, cancel "after" if (ctx->token.exchange(true) == false) { ctx->promise.setTry(std::move(t)); } }); - tk->after(dur).then([ctx](Try const& t) mutable { + // Have time keeper use a weak ptr to hold ctx, + // so that ctx can be deallocated as soon as the future job finished. + tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try const& t) mutable { + auto lockedCtx = weakCtx.lock(); + if (!lockedCtx) { + // ctx already released. "this" completed first, cancel "after" + return; + } // "after" completed first, cancel "this" - ctx->thisFuture.raise(TimedOut()); - if (ctx->token.exchange(true) == false) { + lockedCtx->thisFuture.raise(TimedOut()); + if (lockedCtx->token.exchange(true) == false) { if (t.hasException()) { - ctx->promise.setException(std::move(t.exception())); + lockedCtx->promise.setException(std::move(t.exception())); } else { - ctx->promise.setException(std::move(ctx->exception)); + lockedCtx->promise.setException(std::move(lockedCtx->exception)); } } }); - return ctx->promise.getFuture().via(getExecutor()); + return ctx->promise.getFuture().via(this->getExecutor()); } // delayed @@ -1065,12 +1264,15 @@ Future Future::delayed(Duration dur, Timekeeper* tk) { }); } +namespace futures { namespace detail { -template -void waitImpl(Future& f) { +template +void waitImpl(FutureType& f) { // short-circuit if there's nothing to do - if (f.isReady()) return; + if (f.isReady()) { + return; + } FutureBatonType baton; f.setCallback_([&](const Try& /* t */) { baton.post(); }); @@ -1078,8 +1280,8 @@ void waitImpl(Future& f) { assert(f.isReady()); } -template -void waitImpl(Future& f, Duration dur) { +template +void waitImpl(FutureType& f, Duration dur) { // short-circuit if there's nothing to do if (f.isReady()) { return; @@ -1103,8 +1305,9 @@ void waitViaImpl(Future& f, DrivableExecutor* e) { // Set callback so to ensure that the via executor has something on it // so that once the preceding future triggers this callback, drive will // always have a callback to satisfy it - if (f.isReady()) + if (f.isReady()) { return; + } f = f.via(e).then([](T&& t) { return std::move(t); }); while (!f.isReady()) { e->drive(); @@ -1112,57 +1315,82 @@ void waitViaImpl(Future& f, DrivableExecutor* e) { assert(f.isReady()); } -} // detail +} // namespace detail +} // namespace futures template -Future& Future::wait() & { - detail::waitImpl(*this); +SemiFuture& SemiFuture::wait() & { + futures::detail::waitImpl(*this); return *this; } template -Future&& Future::wait() && { - detail::waitImpl(*this); +SemiFuture&& SemiFuture::wait() && { + futures::detail::waitImpl(*this); return std::move(*this); } template -Future& Future::wait(Duration dur) & { - detail::waitImpl(*this, dur); +SemiFuture& SemiFuture::wait(Duration dur) & { + futures::detail::waitImpl(*this, dur); return *this; } template -Future&& Future::wait(Duration dur) && { - detail::waitImpl(*this, dur); +SemiFuture&& SemiFuture::wait(Duration dur) && { + futures::detail::waitImpl(*this, dur); return std::move(*this); } template -Future& Future::waitVia(DrivableExecutor* e) & { - detail::waitViaImpl(*this, e); +T SemiFuture::get() { + return std::move(wait().value()); +} + +template +T SemiFuture::get(Duration dur) { + wait(dur); + if (this->isReady()) { + return std::move(this->value()); + } else { + throwTimedOut(); + } +} + +template +Future& Future::wait() & { + futures::detail::waitImpl(*this); return *this; } template -Future&& Future::waitVia(DrivableExecutor* e) && { - detail::waitViaImpl(*this, e); +Future&& Future::wait() && { + futures::detail::waitImpl(*this); return std::move(*this); } template -T Future::get() { - return std::move(wait().value()); +Future& Future::wait(Duration dur) & { + futures::detail::waitImpl(*this, dur); + return *this; } template -T Future::get(Duration dur) { - wait(dur); - if (isReady()) { - return std::move(value()); - } else { - throw TimedOut(); - } +Future&& Future::wait(Duration dur) && { + futures::detail::waitImpl(*this, dur); + return std::move(*this); +} + +template +Future& Future::waitVia(DrivableExecutor* e) & { + futures::detail::waitViaImpl(*this, e); + return *this; +} + +template +Future&& Future::waitVia(DrivableExecutor* e) && { + futures::detail::waitViaImpl(*this, e); + return std::move(*this); } template @@ -1170,20 +1398,23 @@ T Future::getVia(DrivableExecutor* e) { return std::move(waitVia(e).value()); } +namespace futures { namespace detail { - template - struct TryEquals { - static bool equals(const Try& t1, const Try& t2) { - return t1.value() == t2.value(); - } - }; -} +template +struct TryEquals { + static bool equals(const Try& t1, const Try& t2) { + return t1.value() == t2.value(); + } +}; +} // namespace detail +} // namespace futures template Future Future::willEqual(Future& f) { return collectAll(*this, f).then([](const std::tuple, Try>& t) { if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) { - return detail::TryEquals::equals(std::get<0>(t), std::get<1>(t)); + return futures::detail::TryEquals::equals( + std::get<0>(t), std::get<1>(t)); } else { return false; } @@ -1196,52 +1427,12 @@ Future Future::filter(F&& predicate) { return this->then([p = std::forward(predicate)](T val) { T const& valConstRef = val; if (!p(valConstRef)) { - throw PredicateDoesNotObtain(); + throwPredicateDoesNotObtain(); } return val; }); } -template -template -auto Future::thenMulti(Callback&& fn) - -> decltype(this->then(std::forward(fn))) { - // thenMulti with one callback is just a then - return then(std::forward(fn)); -} - -template -template -auto Future::thenMulti(Callback&& fn, Callbacks&&... fns) - -> decltype(this->then(std::forward(fn)). - thenMulti(std::forward(fns)...)) { - // thenMulti with two callbacks is just then(a).thenMulti(b, ...) - return then(std::forward(fn)). - thenMulti(std::forward(fns)...); -} - -template -template -auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn, - Callbacks&&... fns) - -> decltype(this->then(std::forward(fn)). - thenMulti(std::forward(fns)...)) { - // thenMultiExecutor with two callbacks is - // via(x).then(a).thenMulti(b, ...).via(oldX) - auto oldX = getExecutor(); - setExecutor(x); - return then(std::forward(fn)). - thenMulti(std::forward(fns)...).via(oldX); -} - -template -template -auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn) - -> decltype(this->then(std::forward(fn))) { - // thenMulti with one callback is just a then with an executor - return then(x, std::forward(fn)); -} - template inline Future when(bool p, F&& thunk) { return p ? std::forward(thunk)().unit() : makeFuture(); @@ -1281,232 +1472,6 @@ namespace futures { } } -namespace futures { - -namespace detail { - -struct retrying_policy_raw_tag {}; -struct retrying_policy_fut_tag {}; - -template -struct retrying_policy_traits { - using ew = exception_wrapper; - FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator()); - template - using has_op = typename std::integral_constant::value || - has_op_call::value>; - using is_raw = has_op; - using is_fut = has_op>; - using tag = typename std::conditional< - is_raw::value, retrying_policy_raw_tag, typename std::conditional< - is_fut::value, retrying_policy_fut_tag, void>::type>::type; -}; - -template -void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) { - using F = typename std::result_of::type; - using T = typename F::value_type; - auto f = ff(k++); - f.then([ - k, - prom = std::move(prom), - pm = std::forward(p), - ffm = std::forward(ff) - ](Try && t) mutable { - if (t.hasValue()) { - prom.setValue(std::move(t).value()); - return; - } - auto& x = t.exception(); - auto q = pm(k, x); - q.then([ - k, - prom = std::move(prom), - xm = std::move(x), - pm = std::move(pm), - ffm = std::move(ffm) - ](bool shouldRetry) mutable { - if (shouldRetry) { - retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom)); - } else { - prom.setException(std::move(xm)); - }; - }); - }); -} - -template -typename std::result_of::type -retrying(size_t k, Policy&& p, FF&& ff) { - using F = typename std::result_of::type; - using T = typename F::value_type; - auto prom = Promise(); - auto f = prom.getFuture(); - retryingImpl( - k, std::forward(p), std::forward(ff), std::move(prom)); - return f; -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { - auto q = [pm = std::forward(p)](size_t k, exception_wrapper x) { - return makeFuture(pm(k, x)); - }; - return retrying(0, std::move(q), std::forward(ff)); -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { - return retrying(0, std::forward(p), std::forward(ff)); -} - -// jittered exponential backoff, clamped to [backoff_min, backoff_max] -template -Duration retryingJitteredExponentialBackoffDur( - size_t n, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG& rng) { - using d = Duration; - auto dist = std::normal_distribution(0.0, jitter_param); - auto jitter = std::exp(dist(rng)); - auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); - return std::max(backoff_min, std::min(backoff_max, backoff)); -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p) { - return [ - pm = std::forward(p), - max_tries, - backoff_min, - backoff_max, - jitter_param, - rngp = std::forward(rng) - ](size_t n, const exception_wrapper& ex) mutable { - if (n == max_tries) { - return makeFuture(false); - } - return pm(n, ex).then( - [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ]( - bool v) mutable { - if (!v) { - return makeFuture(false); - } - auto backoff = detail::retryingJitteredExponentialBackoffDur( - n, backoff_min, backoff_max, jitter_param, rngp); - return futures::sleep(backoff).then([] { return true; }); - }); - }; -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p, - retrying_policy_raw_tag) { - auto q = [pm = std::forward(p)]( - size_t n, const exception_wrapper& e) { - return makeFuture(pm(n, e)); - }; - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::move(q)); -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p, - retrying_policy_fut_tag) { - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::forward(p)); -} -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff) { - using tag = typename detail::retrying_policy_traits::tag; - return detail::retrying(std::forward(p), std::forward(ff), tag()); -} - -inline -std::function -retryingPolicyBasic( - size_t max_tries) { - return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p) { - using tag = typename detail::retrying_policy_traits::tag; - return detail::retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::forward(p), - tag()); -} - -inline -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param) { - auto p = [](size_t, const exception_wrapper&) { return true; }; - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - ThreadLocalPRNG(), - std::move(p)); -} - -} - // Instantiate the most common Future types to save compile time extern template class Future; extern template class Future; @@ -1514,5 +1479,4 @@ extern template class Future; extern template class Future; extern template class Future; extern template class Future; - } // namespace folly