X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Ffutures%2FFuture-inl.h;h=56b98410abcfe59d24cfcdbe89caecf08a0d8ba2;hp=003c9c896b62955a8c45bb7eff8b25ad8ee842a2;hb=ff18deaf720fbe59551a7ff275b09003a61c4351;hpb=f639b4542e084dd4cd5130000b63384a53f50973 diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 003c9c89..56b98410 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,63 +13,230 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #pragma once +#include +#include #include #include -#include #include -#include +#include #include +#include +#include + +#ifndef FOLLY_FUTURE_USING_FIBER +#if FOLLY_MOBILE || defined(__APPLE__) +#define FOLLY_FUTURE_USING_FIBER 0 +#else +#define FOLLY_FUTURE_USING_FIBER 1 +#include +#endif +#endif namespace folly { class Timekeeper; +namespace futures { +namespace detail { +#if FOLLY_FUTURE_USING_FIBER +typedef folly::fibers::Baton FutureBatonType; +#else +typedef folly::Baton<> FutureBatonType; +#endif +} // namespace detail +} // namespace futures + +namespace detail { +std::shared_ptr getTimekeeperSingleton(); +} // namespace detail + +namespace futures { namespace detail { - Timekeeper* getTimekeeperSingleton(); +// Guarantees that the stored functor is destructed before the stored promise +// may be fulfilled. Assumes the stored functor to be noexcept-destructible. +template +class CoreCallbackState { + public: + template + CoreCallbackState(Promise&& promise, FF&& func) noexcept( + noexcept(F(std::declval()))) + : func_(std::forward(func)), promise_(std::move(promise)) { + assert(before_barrier()); + } + + CoreCallbackState(CoreCallbackState&& that) noexcept( + noexcept(F(std::declval()))) { + if (that.before_barrier()) { + new (&func_) F(std::move(that.func_)); + promise_ = that.stealPromise(); + } + } + + CoreCallbackState& operator=(CoreCallbackState&&) = delete; + + ~CoreCallbackState() { + if (before_barrier()) { + stealPromise(); + } + } + + template + auto invoke(Args&&... args) noexcept( + noexcept(std::declval()(std::declval()...))) { + assert(before_barrier()); + return std::move(func_)(std::forward(args)...); + } + + template + auto tryInvoke(Args&&... args) noexcept { + return makeTryWith([&] { return invoke(std::forward(args)...); }); + } + + void setTry(Try&& t) { + stealPromise().setTry(std::move(t)); + } + + void setException(exception_wrapper&& ew) { + stealPromise().setException(std::move(ew)); + } + + Promise stealPromise() noexcept { + assert(before_barrier()); + func_.~F(); + return std::move(promise_); + } + + private: + bool before_barrier() const noexcept { + return !promise_.isFulfilled(); + } + + union { + F func_; + }; + Promise promise_{Promise::makeEmpty()}; +}; + +template +inline auto makeCoreCallbackState(Promise&& p, F&& f) noexcept( + noexcept(CoreCallbackState>>( + std::declval&&>(), + std::declval()))) { + return CoreCallbackState>>( + std::move(p), std::forward(f)); } template -Future::Future(Future&& other) noexcept : core_(other.core_) { +FutureBase::FutureBase(SemiFuture&& other) noexcept : core_(other.core_) { other.core_ = nullptr; } template -Future& Future::operator=(Future&& other) noexcept { - std::swap(core_, other.core_); - return *this; +FutureBase::FutureBase(Future&& other) noexcept : core_(other.core_) { + other.core_ = nullptr; } template template -Future::Future(T2&& val) : core_(nullptr) { - Promise p; - p.setValue(std::forward(val)); - *this = p.getFuture(); -} +FutureBase::FutureBase(T2&& val) + : core_(new futures::detail::Core(Try(std::forward(val)))) {} template -template ::value, - int>::type> -Future::Future() : core_(nullptr) { - Promise p; - p.setValue(); - *this = p.getFuture(); +template +FutureBase::FutureBase( + typename std::enable_if::value>::type*) + : core_(new futures::detail::Core(Try(T()))) {} + +template +template < + class... Args, + typename std::enable_if::value, int>:: + type> +FutureBase::FutureBase(in_place_t, Args&&... args) + : core_( + new futures::detail::Core(in_place, std::forward(args)...)) { } +template +template +void FutureBase::assign(FutureType& other) noexcept { + std::swap(core_, other.core_); +} template -Future::~Future() { +FutureBase::~FutureBase() { detach(); } template -void Future::detach() { +T& FutureBase::value() & { + return result().value(); +} + +template +T const& FutureBase::value() const& { + return result().value(); +} + +template +T&& FutureBase::value() && { + return std::move(result().value()); +} + +template +T const&& FutureBase::value() const&& { + return std::move(result().value()); +} + +template +Try& FutureBase::result() & { + throwIfInvalid(); + + return core_->getTry(); +} + +template +Try const& FutureBase::result() const& { + throwIfInvalid(); + + return core_->getTry(); +} + +template +Try&& FutureBase::result() && { + throwIfInvalid(); + + return std::move(core_->getTry()); +} + +template +Try const&& FutureBase::result() const&& { + throwIfInvalid(); + + return std::move(core_->getTry()); +} + +template +bool FutureBase::isReady() const { + throwIfInvalid(); + return core_->ready(); +} + +template +bool FutureBase::hasValue() { + return core_->getTry().hasValue(); +} + +template +bool FutureBase::hasException() { + return core_->getTry().hasException(); +} + +template +void FutureBase::detach() { if (core_) { core_->detachFuture(); core_ = nullptr; @@ -77,30 +244,37 @@ void Future::detach() { } template -void Future::throwIfInvalid() const { - if (!core_) - throw NoState(); +void FutureBase::throwIfInvalid() const { + if (!core_) { + throwNoState(); + } } template -template -void Future::setCallback_(F&& func) { - throwIfInvalid(); - core_->setCallback(std::move(func)); +Optional> FutureBase::poll() { + Optional> o; + if (core_->ready()) { + o = std::move(core_->getTry()); + } + return o; } -// unwrap +template +void FutureBase::raise(exception_wrapper exception) { + core_->raise(std::move(exception)); +} template template -typename std::enable_if::value, - Future::Inner>>::type -Future::unwrap() { - return then([](Future::Inner> internal_future) { - return internal_future; - }); +void FutureBase::setCallback_(F&& func) { + throwIfInvalid(); + core_->setCallback(std::forward(func)); } +template +FutureBase::FutureBase(futures::detail::EmptyConstruct) noexcept + : core_(nullptr) {} + // then // Variant: returns a value @@ -108,21 +282,20 @@ Future::unwrap() { template template typename std::enable_if::type -Future::thenImplementation(F func, detail::argResult) { +FutureBase::thenImplementation( + F&& func, + futures::detail::argResult) { static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument"); typedef typename R::ReturnsFuture::Inner B; - throwIfInvalid(); + this->throwIfInvalid(); - // wrap these so we can move them into the lambda - folly::MoveWrapper> p; - folly::MoveWrapper funcm(std::forward(func)); + Promise p; + p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler()); // grab the Future now before we lose our handle on the Promise - auto f = p->getFuture(); - if (getExecutor()) { - f.setExecutor(getExecutor()); - } + auto f = p.getFuture(); + f.core_->setExecutorNoLock(this->getExecutor()); /* This is a bit tricky. @@ -143,30 +316,27 @@ Future::thenImplementation(F func, detail::argResult) { persist beyond the callback, if it gets moved), and so it is an optimization to just make it shared from the get-go. - We have to move in the Promise and func using the MoveWrapper - hack. (func could be copied but it's a big drag on perf). - - Two subtle but important points about this design. detail::Core has no - back pointers to Future or Promise, so if Future or Promise get moved - (and they will be moved in performant code) we don't have to do + Two subtle but important points about this design. futures::detail::Core + has no back pointers to Future or Promise, so if Future or Promise get + moved (and they will be moved in performant code) we don't have to do anything fancy. And because we store the continuation in the - detail::Core, not in the Future, we can execute the continuation even - after the Future has gone out of scope. This is an intentional design + futures::detail::Core, not in the Future, we can execute the continuation + even after the Future has gone out of scope. This is an intentional design decision. It is likely we will want to be able to cancel a continuation in some circumstances, but I think it should be explicit not implicit in the destruction of the Future used to create it. */ - setCallback_( - [p, funcm](Try&& t) mutable { - if (!isTry && t.hasException()) { - p->setException(std::move(t.exception())); - } else { - p->setWith([&]() { - return (*funcm)(t.template get()...); - }); - } - }); + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (!isTry && t.hasException()) { + state.setException(std::move(t.exception())); + } else { + state.setTry(makeTryWith( + [&] { return state.invoke(t.template get()...); })); + } + }); return f; } @@ -175,99 +345,345 @@ Future::thenImplementation(F func, detail::argResult) { template template typename std::enable_if::type -Future::thenImplementation(F func, detail::argResult) { +FutureBase::thenImplementation( + F&& func, + futures::detail::argResult) { static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument"); typedef typename R::ReturnsFuture::Inner B; + this->throwIfInvalid(); - throwIfInvalid(); - - // wrap these so we can move them into the lambda - folly::MoveWrapper> p; - folly::MoveWrapper funcm(std::forward(func)); + Promise p; + p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler()); // grab the Future now before we lose our handle on the Promise - auto f = p->getFuture(); - if (getExecutor()) { - f.setExecutor(getExecutor()); - } + auto f = p.getFuture(); + f.core_->setExecutorNoLock(this->getExecutor()); - setCallback_( - [p, funcm](Try&& t) mutable { - if (!isTry && t.hasException()) { - p->setException(std::move(t.exception())); - } else { - try { - auto f2 = (*funcm)(t.template get()...); - // that didn't throw, now we can steal p - f2.setCallback_([p](Try&& b) mutable { - p->setTry(std::move(b)); - }); - } catch (const std::exception& e) { - p->setException(exception_wrapper(std::current_exception(), e)); - } catch (...) { - p->setException(exception_wrapper(std::current_exception())); + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (!isTry && t.hasException()) { + state.setException(std::move(t.exception())); + } else { + auto tf2 = state.tryInvoke(t.template get()...); + if (tf2.hasException()) { + state.setException(std::move(tf2.exception())); + } else { + tf2->setCallback_([p = state.stealPromise()](Try && b) mutable { + p.setTry(std::move(b)); + }); + } } - } - }); + }); return f; } +} // namespace detail +} // namespace futures + +template +SemiFuture::type> makeSemiFuture(T&& t) { + return makeSemiFuture(Try::type>(std::forward(t))); +} + +// makeSemiFutureWith(SemiFuture()) -> SemiFuture +template +typename std::enable_if< + isSemiFuture::type>::value, + typename std::result_of::type>::type +makeSemiFutureWith(F&& func) { + using InnerType = + typename isSemiFuture::type>::Inner; + try { + return std::forward(func)(); + } catch (std::exception& e) { + return makeSemiFuture( + exception_wrapper(std::current_exception(), e)); + } catch (...) { + return makeSemiFuture( + exception_wrapper(std::current_exception())); + } +} + +// makeSemiFutureWith(T()) -> SemiFuture +// makeSemiFutureWith(void()) -> SemiFuture +template +typename std::enable_if< + !(isSemiFuture::type>::value), + SemiFuture::type>>>::type +makeSemiFutureWith(F&& func) { + using LiftedResult = Unit::LiftT::type>; + return makeSemiFuture( + makeTryWith([&func]() mutable { return std::forward(func)(); })); +} + +template +SemiFuture makeSemiFuture(std::exception_ptr const& e) { + return makeSemiFuture(Try(e)); +} + +template +SemiFuture makeSemiFuture(exception_wrapper ew) { + return makeSemiFuture(Try(std::move(ew))); +} + +template +typename std:: + enable_if::value, SemiFuture>::type + makeSemiFuture(E const& e) { + return makeSemiFuture(Try(make_exception_wrapper(e))); +} + +template +SemiFuture makeSemiFuture(Try&& t) { + return SemiFuture(new futures::detail::Core(std::move(t))); +} + +// This must be defined after the constructors to avoid a bug in MSVC +// https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244 +inline SemiFuture makeSemiFuture() { + return makeSemiFuture(Unit{}); +} + +template +SemiFuture SemiFuture::makeEmpty() { + return SemiFuture(futures::detail::EmptyConstruct{}); +} + +template +SemiFuture::SemiFuture(SemiFuture&& other) noexcept + : futures::detail::FutureBase(std::move(other)) {} + +template +SemiFuture::SemiFuture(Future&& other) noexcept + : futures::detail::FutureBase(std::move(other)) { + // SemiFuture should not have an executor on construction + if (this->core_) { + this->setExecutor(nullptr); + } +} + +template +SemiFuture& SemiFuture::operator=(SemiFuture&& other) noexcept { + this->assign(other); + return *this; +} + +template +SemiFuture& SemiFuture::operator=(Future&& other) noexcept { + this->assign(other); + // SemiFuture should not have an executor on construction + if (this->core_) { + this->setExecutor(nullptr); + } + return *this; +} + +template +void SemiFuture::boost_() { + // If a SemiFuture has an executor it should be deferred, so boost it + if (auto e = this->getExecutor()) { + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor. Verify this in debug mode. + DCHECK(nullptr != dynamic_cast(e)); + + auto ka = static_cast(e)->getKeepAliveToken(); + static_cast(e)->boost(); + } +} + +template +inline Future SemiFuture::via(Executor* executor, int8_t priority) && { + throwIfInvalid(); + if (!executor) { + throwNoExecutor(); + } + + // If current executor is deferred, boost block to ensure that work + // progresses and is run on the new executor. + auto oldExecutor = this->getExecutor(); + if (oldExecutor && executor && (executor != oldExecutor)) { + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor. Verify this in debug mode. + DCHECK(nullptr != dynamic_cast(this->getExecutor())); + if (static_cast(oldExecutor)) { + executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() { + static_cast(oldExecutorKA.get())->boost(); + }); + } + } + + this->setExecutor(executor, priority); + + auto newFuture = Future(this->core_); + this->core_ = nullptr; + return newFuture; +} + +template +template +SemiFuture::Return::value_type> +SemiFuture::defer(F&& func) && { + // If we already have a deferred executor, use it, otherwise create one + auto defKeepAlive = this->getExecutor() + ? this->getExecutor()->getKeepAliveToken() + : DeferredExecutor::create(); + auto e = defKeepAlive.get(); + // We know in a SemiFuture that if we have an executor it should be + // DeferredExecutor (either it was that way before, or we just created it). + // Verify this in debug mode. + DCHECK(nullptr != dynamic_cast(e)); + // Convert to a folly::future with a deferred executor + // Will be low-cost if this is not a new executor as via optimises for that + // case + auto sf = + std::move(*this) + .via(e) + // Then add the work, with a wrapper function that captures the + // keepAlive so the executor is destroyed at the right time. + .then( + DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func))) + // Finally, convert back o a folly::SemiFuture to hide the executor + .semi(); + // Carry deferred executor through chain as constructor from Future will + // nullify it + sf.setExecutor(e); + return sf; +} + +template +Future Future::makeEmpty() { + return Future(futures::detail::EmptyConstruct{}); +} + +template +Future::Future(Future&& other) noexcept + : futures::detail::FutureBase(std::move(other)) {} + +template +Future& Future::operator=(Future&& other) noexcept { + this->assign(other); + return *this; +} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value && + std::is_convertible::value, + int>::type> +Future::Future(Future&& other) + : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value && + !std::is_convertible::value, + int>::type> +Future::Future(Future&& other) + : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {} + +template +template < + class T2, + typename std::enable_if< + !std::is_same::type>::value && + std::is_constructible::value, + int>::type> +Future& Future::operator=(Future&& other) { + return operator=( + std::move(other).then([](T2&& v) { return T(std::move(v)); })); +} + +// unwrap + +template +template +typename std:: + enable_if::value, Future::Inner>>::type + Future::unwrap() { + return then([](Future::Inner> internal_future) { + return internal_future; + }); +} + +template +inline Future Future::via(Executor* executor, int8_t priority) && { + this->throwIfInvalid(); + + this->setExecutor(executor, priority); + + auto newFuture = Future(this->core_); + this->core_ = nullptr; + return newFuture; +} + +template +inline Future Future::via(Executor* executor, int8_t priority) & { + this->throwIfInvalid(); + Promise p; + auto f = p.getFuture(); + auto func = [p = std::move(p)](Try&& t) mutable { + p.setTry(std::move(t)); + }; + using R = futures::detail::callableResult; + this->template thenImplementation( + std::move(func), typename R::Arg()); + return std::move(f).via(executor, priority); +} template template Future::Inner> Future::then(R(Caller::*func)(Args...), Caller *instance) { - typedef typename std::remove_cv< - typename std::remove_reference< - typename detail::ArgType::FirstArg>::type>::type FirstArg; + typedef typename std::remove_cv::FirstArg>::type>::type + FirstArg; + return then([instance, func](Try&& t){ return (instance->*func)(t.template get::value, Args>()...); }); } template -template -auto Future::then(Executor* x, Arg&& arg, Args&&... args) - -> decltype(this->then(std::forward(arg), - std::forward(args)...)) -{ - auto oldX = getExecutor(); - setExecutor(x); - return this->then(std::forward(arg), std::forward(args)...). - via(oldX); -} - -template -Future Future::then() { - return then([] (Try&& t) {}); +Future Future::then() { + return then([] () {}); } // onError where the callback returns T template template typename std::enable_if< - !detail::callableWith::value && - !detail::Extract::ReturnsFuture::value, - Future>::type + !futures::detail::callableWith::value && + !futures::detail::callableWith::value && + !futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { - typedef typename detail::Extract::FirstArg Exn; + typedef std::remove_reference_t< + typename futures::detail::Extract::FirstArg> + Exn; static_assert( - std::is_same::RawReturn, T>::value, + std::is_same::RawReturn, T>::value, "Return type of onError callback must be T or Future"); Promise p; + p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler()); auto f = p.getFuture(); - auto pm = folly::makeMoveWrapper(std::move(p)); - auto funcm = folly::makeMoveWrapper(std::move(func)); - setCallback_([pm, funcm](Try&& t) mutable { - if (!t.template withException([&] (Exn& e) { - pm->setWith([&]{ - return (*funcm)(e); - }); - })) { - pm->setTry(std::move(t)); - } - }); + + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (auto e = t.template tryGetExceptionObject()) { + state.setTry(makeTryWith([&] { return state.invoke(*e); })); + } else { + state.setTry(std::move(t)); + } + }); return f; } @@ -276,45 +692,47 @@ Future::onError(F&& func) { template template typename std::enable_if< - !detail::callableWith::value && - detail::Extract::ReturnsFuture::value, - Future>::type + !futures::detail::callableWith::value && + !futures::detail::callableWith::value && + futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { static_assert( - std::is_same::Return, Future>::value, + std::is_same::Return, Future>:: + value, "Return type of onError callback must be T or Future"); - typedef typename detail::Extract::FirstArg Exn; + typedef std::remove_reference_t< + typename futures::detail::Extract::FirstArg> + Exn; Promise p; auto f = p.getFuture(); - auto pm = folly::makeMoveWrapper(std::move(p)); - auto funcm = folly::makeMoveWrapper(std::move(func)); - setCallback_([pm, funcm](Try&& t) mutable { - if (!t.template withException([&] (Exn& e) { - try { - auto f2 = (*funcm)(e); - f2.setCallback_([pm](Try&& t2) mutable { - pm->setTry(std::move(t2)); + + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (auto e = t.template tryGetExceptionObject()) { + auto tf2 = state.tryInvoke(*e); + if (tf2.hasException()) { + state.setException(std::move(tf2.exception())); + } else { + tf2->setCallback_([p = state.stealPromise()](Try && t3) mutable { + p.setTry(std::move(t3)); }); - } catch (const std::exception& e2) { - pm->setException(exception_wrapper(std::current_exception(), e2)); - } catch (...) { - pm->setException(exception_wrapper(std::current_exception())); } - })) { - pm->setTry(std::move(t)); - } - }); + } else { + state.setTry(std::move(t)); + } + }); return f; } template template -Future Future::ensure(F func) { - MoveWrapper funcw(std::move(func)); - return this->then([funcw](Try&& t) { - (*funcw)(); +Future Future::ensure(F&& func) { + return this->then([funcw = std::forward(func)](Try && t) mutable { + std::move(funcw)(); return makeFuture(std::move(t)); }); } @@ -322,42 +740,40 @@ Future Future::ensure(F func) { template template Future Future::onTimeout(Duration dur, F&& func, Timekeeper* tk) { - auto funcw = folly::makeMoveWrapper(std::forward(func)); - return within(dur, tk) - .onError([funcw](TimedOut const&) { return (*funcw)(); }); + return within(dur, tk).onError([funcw = std::forward(func)]( + TimedOut const&) { return std::move(funcw)(); }); } template template typename std::enable_if< - detail::callableWith::value && - detail::Extract::ReturnsFuture::value, - Future>::type + futures::detail::callableWith::value && + futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { static_assert( - std::is_same::Return, Future>::value, + std::is_same::Return, Future>:: + value, "Return type of onError callback must be T or Future"); Promise p; auto f = p.getFuture(); - auto pm = folly::makeMoveWrapper(std::move(p)); - auto funcm = folly::makeMoveWrapper(std::move(func)); - setCallback_([pm, funcm](Try t) mutable { - if (t.hasException()) { - try { - auto f2 = (*funcm)(std::move(t.exception())); - f2.setCallback_([pm](Try t2) mutable { - pm->setTry(std::move(t2)); - }); - } catch (const std::exception& e2) { - pm->setException(exception_wrapper(std::current_exception(), e2)); - } catch (...) { - pm->setException(exception_wrapper(std::current_exception())); - } - } else { - pm->setTry(std::move(t)); - } - }); + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try t) mutable { + if (t.hasException()) { + auto tf2 = state.tryInvoke(std::move(t.exception())); + if (tf2.hasException()) { + state.setException(std::move(tf2.exception())); + } else { + tf2->setCallback_([p = state.stealPromise()](Try && t3) mutable { + p.setTry(std::move(t3)); + }); + } + } else { + state.setTry(std::move(t)); + } + }); return f; } @@ -366,168 +782,103 @@ Future::onError(F&& func) { template template typename std::enable_if< - detail::callableWith::value && - !detail::Extract::ReturnsFuture::value, - Future>::type + futures::detail::callableWith::value && + !futures::detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { static_assert( - std::is_same::Return, Future>::value, + std::is_same::Return, Future>:: + value, "Return type of onError callback must be T or Future"); Promise p; auto f = p.getFuture(); - auto pm = folly::makeMoveWrapper(std::move(p)); - auto funcm = folly::makeMoveWrapper(std::move(func)); - setCallback_([pm, funcm](Try t) mutable { - if (t.hasException()) { - pm->setWith([&]{ - return (*funcm)(std::move(t.exception())); + this->setCallback_( + [state = futures::detail::makeCoreCallbackState( + std::move(p), std::forward(func))](Try&& t) mutable { + if (t.hasException()) { + state.setTry(makeTryWith( + [&] { return state.invoke(std::move(t.exception())); })); + } else { + state.setTry(std::move(t)); + } }); - } else { - pm->setTry(std::move(t)); - } - }); return f; } -template -typename std::add_lvalue_reference::type Future::value() { - throwIfInvalid(); - - return core_->getTry().value(); -} - -template -typename std::add_lvalue_reference::type Future::value() const { - throwIfInvalid(); - - return core_->getTry().value(); -} - -template -Try& Future::getTry() { - throwIfInvalid(); - - return core_->getTry(); -} - -template -Optional> Future::poll() { - Optional> o; - if (core_->ready()) { - o = std::move(core_->getTry()); - } - return o; -} - -template -inline Future Future::via(Executor* executor) && { - throwIfInvalid(); - - setExecutor(executor); - - return std::move(*this); -} - -template -inline Future Future::via(Executor* executor) & { - throwIfInvalid(); - - MoveWrapper> p; - auto f = p->getFuture(); - then([p](Try&& t) mutable { p->setTry(std::move(t)); }); - return std::move(f).via(executor); -} - -template -bool Future::isReady() const { - throwIfInvalid(); - return core_->ready(); -} - -template -void Future::raise(exception_wrapper exception) { - core_->raise(std::move(exception)); +template +auto via(Executor* x, Func&& func) + -> Future()())>::Inner> { + // TODO make this actually more performant. :-P #7260175 + return via(x).then(std::forward(func)); } // makeFuture template Future::type> makeFuture(T&& t) { - Promise::type> p; - p.setValue(std::forward(t)); - return p.getFuture(); + return makeFuture(Try::type>(std::forward(t))); } -inline // for multiple translation units -Future makeFuture() { - Promise p; - p.setValue(); - return p.getFuture(); +inline Future makeFuture() { + return makeFuture(Unit{}); } +// makeFutureWith(Future()) -> Future template -auto makeFutureWith( - F&& func, - typename std::enable_if::value, bool>::type sdf) - -> Future { - Promise p; - p.setWith( - [&func]() { - return (func)(); - }); - return p.getFuture(); +typename std::enable_if::type>::value, + typename std::result_of::type>::type +makeFutureWith(F&& func) { + using InnerType = + typename isFuture::type>::Inner; + try { + return std::forward(func)(); + } catch (std::exception& e) { + return makeFuture( + exception_wrapper(std::current_exception(), e)); + } catch (...) { + return makeFuture(exception_wrapper(std::current_exception())); + } } +// makeFutureWith(T()) -> Future +// makeFutureWith(void()) -> Future template -auto makeFutureWith(F const& func) -> Future { - F copy = func; - return makeFutureWith(std::move(copy)); +typename std::enable_if< + !(isFuture::type>::value), + Future::type>>>::type +makeFutureWith(F&& func) { + using LiftedResult = Unit::LiftT::type>; + return makeFuture( + makeTryWith([&func]() mutable { return std::forward(func)(); })); } template Future makeFuture(std::exception_ptr const& e) { - Promise p; - p.setException(e); - return p.getFuture(); + return makeFuture(Try(e)); } template Future makeFuture(exception_wrapper ew) { - Promise p; - p.setException(std::move(ew)); - return p.getFuture(); + return makeFuture(Try(std::move(ew))); } template typename std::enable_if::value, Future>::type makeFuture(E const& e) { - Promise p; - p.setException(make_exception_wrapper(e)); - return p.getFuture(); + return makeFuture(Try(make_exception_wrapper(e))); } template Future makeFuture(Try&& t) { - Promise::type> p; - p.setTry(std::move(t)); - return p.getFuture(); -} - -template <> -inline Future makeFuture(Try&& t) { - if (t.hasException()) { - return makeFuture(std::move(t.exception())); - } else { - return makeFuture(); - } + return Future(new futures::detail::Core(std::move(t))); } // via -inline Future via(Executor* executor) { - return makeFuture().via(executor); +Future via(Executor* executor, int8_t priority) { + return makeFuture().via(executor, priority); } // mapSetCallback calls func(i, Try) when every future completes @@ -544,13 +895,13 @@ void mapSetCallback(InputIterator first, InputIterator last, F func) { // collectAll (variadic) template -typename detail::VariadicContext< - typename std::decay::type::value_type...>::type +typename futures::detail::CollectAllVariadicContext< + typename std::decay::type::value_type...>::type collectAll(Fs&&... fs) { - auto ctx = std::make_shared::type::value_type...>>(); - detail::collectAllVariadicHelper(ctx, - std::forward::type>(fs)...); + auto ctx = std::make_shared::type::value_type...>>(); + futures::detail::collectVariadicHelper< + futures::detail::CollectAllVariadicContext>(ctx, std::forward(fs)...); return ctx->p.getFuture(); } @@ -565,7 +916,7 @@ collectAll(InputIterator first, InputIterator last) { typename std::iterator_traits::value_type::value_type T; struct CollectAllContext { - CollectAllContext(int n) : results(n) {} + CollectAllContext(size_t n) : results(n) {} ~CollectAllContext() { p.setValue(std::move(results)); } @@ -573,18 +924,24 @@ collectAll(InputIterator first, InputIterator last) { std::vector> results; }; - auto ctx = std::make_shared(std::distance(first, last)); + auto ctx = + std::make_shared(size_t(std::distance(first, last))); mapSetCallback(first, last, [ctx](size_t i, Try&& t) { ctx->results[i] = std::move(t); }); return ctx->p.getFuture(); } +// collect (iterator) + +namespace futures { namespace detail { template struct CollectContext { - struct Nothing { explicit Nothing(int n) {} }; + struct Nothing { + explicit Nothing(int /* n */) {} + }; using Result = typename std::conditional< std::is_void::value, @@ -596,7 +953,7 @@ struct CollectContext { Nothing, std::vector>>::type; - explicit CollectContext(int n) : result(n) {} + explicit CollectContext(size_t n) : result(n) {} ~CollectContext() { if (!threw.exchange(true)) { // map Optional -> T @@ -613,28 +970,21 @@ struct CollectContext { } Promise p; InternalResult result; - std::atomic threw; + std::atomic threw {false}; }; -// Specialize for void (implementations in Future.cpp) - -template <> -CollectContext::~CollectContext(); - -template <> -void CollectContext::setPartialResult(size_t i, Try& t); - -} +} // namespace detail +} // namespace futures template -Future::value_type::value_type>::Result> +Future::value_type::value_type>::Result> collect(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - auto ctx = std::make_shared>( - std::distance(first, last)); + auto ctx = std::make_shared>( + std::distance(first, last)); mapSetCallback(first, last, [ctx](size_t i, Try&& t) { if (t.hasException()) { if (!ctx->threw.exchange(true)) { @@ -647,6 +997,21 @@ collect(InputIterator first, InputIterator last) { return ctx->p.getFuture(); } +// collect (variadic) + +template +typename futures::detail::CollectVariadicContext< + typename std::decay::type::value_type...>::type +collect(Fs&&... fs) { + auto ctx = std::make_shared::type::value_type...>>(); + futures::detail::collectVariadicHelper< + futures::detail::CollectVariadicContext>(ctx, std::forward(fs)...); + return ctx->p.getFuture(); +} + +// collectAny (iterator) + template Future< std::pair::value_type::value_type T; struct CollectAnyContext { - CollectAnyContext(size_t n) : done(false) {}; + CollectAnyContext() {} Promise>> p; - std::atomic done; + std::atomic done {false}; }; - auto ctx = std::make_shared(std::distance(first, last)); + auto ctx = std::make_shared(); mapSetCallback(first, last, [ctx](size_t i, Try&& t) { if (!ctx->done.exchange(true)) { ctx->p.setValue(std::make_pair(i, std::move(t))); @@ -672,6 +1037,39 @@ collectAny(InputIterator first, InputIterator last) { return ctx->p.getFuture(); } +// collectAnyWithoutException (iterator) + +template +Future::value_type::value_type>> +collectAnyWithoutException(InputIterator first, InputIterator last) { + typedef + typename std::iterator_traits::value_type::value_type T; + + struct CollectAnyWithoutExceptionContext { + CollectAnyWithoutExceptionContext(){} + Promise> p; + std::atomic done{false}; + std::atomic nFulfilled{0}; + size_t nTotal; + }; + + auto ctx = std::make_shared(); + ctx->nTotal = size_t(std::distance(first, last)); + + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (!t.hasException() && !ctx->done.exchange(true)) { + ctx->p.setValue(std::make_pair(i, std::move(t.value()))); + } else if (++ctx->nFulfilled == ctx->nTotal) { + ctx->p.setException(t.exception()); + } + }); + return ctx->p.getFuture(); +} + +// collectN (iterator) + template Future::value_type::value_type>>>> @@ -687,7 +1085,7 @@ collectN(InputIterator first, InputIterator last, size_t n) { }; auto ctx = std::make_shared(); - if (std::distance(first, last) < n) { + if (size_t(std::distance(first, last)) < n) { ctx->p.setException(std::runtime_error("Not enough futures")); } else { // for each completed Future, increase count and add to vector, until we @@ -697,7 +1095,7 @@ collectN(InputIterator first, InputIterator last, size_t n) { auto c = ++ctx->completed; if (c <= n) { assert(ctx->v.size() < n); - ctx->v.push_back(std::make_pair(i, std::move(t))); + ctx->v.emplace_back(i, std::move(t)); if (c == n) { ctx->p.setTry(Try(std::move(ctx->v))); } @@ -708,6 +1106,8 @@ collectN(InputIterator first, InputIterator last, size_t n) { return ctx->p.getFuture(); } +// reduce (iterator) + template Future reduce(It first, It last, T&& initial, F&& func) { if (first == last) { @@ -716,16 +1116,18 @@ Future reduce(It first, It last, T&& initial, F&& func) { typedef typename std::iterator_traits::value_type::value_type ItT; typedef typename std::conditional< - detail::callableWith&&>::value, Try, ItT>::type Arg; + futures::detail::callableWith&&>::value, + Try, + ItT>::type Arg; typedef isTry IsTry; - folly::MoveWrapper minitial(std::move(initial)); auto sfunc = std::make_shared(std::move(func)); - auto f = first->then([minitial, sfunc](Try& head) mutable { - return (*sfunc)(std::move(*minitial), - head.template get()); - }); + auto f = first->then( + [ minitial = std::move(initial), sfunc ](Try & head) mutable { + return (*sfunc)( + std::move(minitial), head.template get()); + }); for (++first; first != last; ++first) { f = collectAll(f, *first).then([sfunc](std::tuple, Try>& t) { @@ -739,20 +1141,138 @@ Future reduce(It first, It last, T&& initial, F&& func) { return f; } +// window (collection) + +template +std::vector> +window(Collection input, F func, size_t n) { + // Use global inline executor singleton + auto executor = &InlineExecutor::instance(); + return window(executor, std::move(input), std::move(func), n); +} + +template +std::vector> +window(Executor* executor, Collection input, F func, size_t n) { + struct WindowContext { + WindowContext(Executor* executor_, Collection&& input_, F&& func_) + : executor(executor_), + input(std::move(input_)), + promises(input.size()), + func(std::move(func_)) {} + std::atomic i{0}; + Executor* executor; + Collection input; + std::vector> promises; + F func; + + static inline void spawn(std::shared_ptr ctx) { + size_t i = ctx->i++; + if (i < ctx->input.size()) { + auto fut = ctx->func(std::move(ctx->input[i])); + fut.setCallback_([ctx = std::move(ctx), i](Try&& t) mutable { + const auto executor_ = ctx->executor; + executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable { + ctx->promises[i].setTry(std::move(t)); + // Chain another future onto this one + spawn(std::move(ctx)); + }); + }); + } + } + }; + + auto max = std::min(n, input.size()); + + auto ctx = std::make_shared( + executor, std::move(input), std::move(func)); + + // Start the first n Futures + for (size_t i = 0; i < max; ++i) { + executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); }); + } + + std::vector> futures; + futures.reserve(ctx->promises.size()); + for (auto& promise : ctx->promises) { + futures.emplace_back(promise.getFuture()); + } + + return futures; +} + +// reduce + template template Future Future::reduce(I&& initial, F&& func) { - folly::MoveWrapper minitial(std::move(initial)); - folly::MoveWrapper mfunc(std::move(func)); - return then([minitial, mfunc](T& vals) mutable { - auto ret = std::move(*minitial); + return then([ + minitial = std::forward(initial), + mfunc = std::forward(func) + ](T& vals) mutable { + auto ret = std::move(minitial); for (auto& val : vals) { - ret = (*mfunc)(std::move(ret), std::move(val)); + ret = mfunc(std::move(ret), std::move(val)); } return ret; }); } +// unorderedReduce (iterator) + +template +Future unorderedReduce(It first, It last, T initial, F func) { + if (first == last) { + return makeFuture(std::move(initial)); + } + + typedef isTry IsTry; + + struct UnorderedReduceContext { + UnorderedReduceContext(T&& memo, F&& fn, size_t n) + : lock_(), memo_(makeFuture(std::move(memo))), + func_(std::move(fn)), numThens_(0), numFutures_(n), promise_() + {} + folly::MicroSpinLock lock_; // protects memo_ and numThens_ + Future memo_; + F func_; + size_t numThens_; // how many Futures completed and called .then() + size_t numFutures_; // how many Futures in total + Promise promise_; + }; + + auto ctx = std::make_shared( + std::move(initial), std::move(func), std::distance(first, last)); + + mapSetCallback( + first, + last, + [ctx](size_t /* i */, Try&& t) { + // Futures can be completed in any order, simultaneously. + // To make this non-blocking, we create a new Future chain in + // the order of completion to reduce the values. + // The spinlock just protects chaining a new Future, not actually + // executing the reduce, which should be really fast. + folly::MSLGuard lock(ctx->lock_); + ctx->memo_ = + ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable { + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + return ctx->func_(std::move(v), + mt.template get()); + }); + if (++ctx->numThens_ == ctx->numFutures_) { + // After reducing the value of the last Future, fulfill the Promise + ctx->memo_.setCallback_( + [ctx](Try&& t2) { ctx->promise_.setValue(std::move(t2)); }); + } + }); + + return ctx->promise_.getFuture(); +} + +// within + template Future Future::within(Duration dur, Timekeeper* tk) { return within(dur, TimedOut(), tk); @@ -763,131 +1283,256 @@ template Future Future::within(Duration dur, E e, Timekeeper* tk) { struct Context { - Context(E ex) : exception(std::move(ex)), promise(), token(false) {} + Context(E ex) : exception(std::move(ex)), promise() {} E exception; + Future thisFuture; Promise promise; - std::atomic token; + std::atomic token {false}; }; - auto ctx = std::make_shared(std::move(e)); - if (!tk) { - tk = folly::detail::getTimekeeperSingleton(); + if (this->isReady()) { + return std::move(*this); } - tk->after(dur) - .then([ctx](Try const& t) { - if (ctx->token.exchange(true) == false) { - if (t.hasException()) { - ctx->promise.setException(std::move(t.exception())); - } else { - ctx->promise.setException(std::move(ctx->exception)); - } - } - }); + std::shared_ptr tks; + if (LIKELY(!tk)) { + tks = folly::detail::getTimekeeperSingleton(); + tk = tks.get(); + } + + if (UNLIKELY(!tk)) { + return makeFuture(NoTimekeeper()); + } + + auto ctx = std::make_shared(std::move(e)); - this->then([ctx](Try&& t) { + ctx->thisFuture = this->then([ctx](Try&& t) mutable { if (ctx->token.exchange(true) == false) { ctx->promise.setTry(std::move(t)); } }); - return ctx->promise.getFuture(); + // Have time keeper use a weak ptr to hold ctx, + // so that ctx can be deallocated as soon as the future job finished. + tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try const& t) mutable { + auto lockedCtx = weakCtx.lock(); + if (!lockedCtx) { + // ctx already released. "this" completed first, cancel "after" + return; + } + // "after" completed first, cancel "this" + lockedCtx->thisFuture.raise(TimedOut()); + if (lockedCtx->token.exchange(true) == false) { + if (t.hasException()) { + lockedCtx->promise.setException(std::move(t.exception())); + } else { + lockedCtx->promise.setException(std::move(lockedCtx->exception)); + } + } + }); + + return ctx->promise.getFuture().via(this->getExecutor()); } +// delayed + template Future Future::delayed(Duration dur, Timekeeper* tk) { return collectAll(*this, futures::sleep(dur, tk)) - .then([](std::tuple, Try> tup) { - Try& t = std::get<0>(tup); - return makeFuture(std::move(t)); - }); + .then([](std::tuple, Try> tup) { + Try& t = std::get<0>(tup); + return makeFuture(std::move(t)); + }); } +namespace futures { namespace detail { template -void waitImpl(Future& f) { - // short-circuit if there's nothing to do - if (f.isReady()) return; +void doBoost(folly::Future& /* usused */) {} - folly::fibers::Baton baton; - f = f.then([&](Try t) { - baton.post(); - return makeFuture(std::move(t)); - }); - baton.wait(); +template +void doBoost(folly::SemiFuture& f) { + f.boost_(); +} - // There's a race here between the return here and the actual finishing of - // the future. f is completed, but the setup may not have finished on done - // after the baton has posted. - while (!f.isReady()) { - std::this_thread::yield(); +template +void waitImpl(FutureType& f) { + // short-circuit if there's nothing to do + if (f.isReady()) { + return; } + + FutureBatonType baton; + f.setCallback_([&](const Try& /* t */) { baton.post(); }); + doBoost(f); + baton.wait(); + assert(f.isReady()); } -template -void waitImpl(Future& f, Duration dur) { +template +void waitImpl(FutureType& f, Duration dur) { // short-circuit if there's nothing to do - if (f.isReady()) return; + if (f.isReady()) { + return; + } - auto baton = std::make_shared(); - f = f.then([baton](Try t) { + Promise promise; + auto ret = promise.getFuture(); + auto baton = std::make_shared(); + f.setCallback_([baton, promise = std::move(promise)](Try&& t) mutable { + promise.setTry(std::move(t)); baton->post(); - return makeFuture(std::move(t)); }); - - // Let's preserve the invariant that if we did not timeout (timed_wait returns - // true), then the returned Future is complete when it is returned to the - // caller. We need to wait out the race for that Future to complete. - if (baton->timed_wait(dur)) { - while (!f.isReady()) { - std::this_thread::yield(); - } + doBoost(f); + f = std::move(ret); + if (baton->try_wait_for(dur)) { + assert(f.isReady()); } } template void waitViaImpl(Future& f, DrivableExecutor* e) { + // Set callback so to ensure that the via executor has something on it + // so that once the preceding future triggers this callback, drive will + // always have a callback to satisfy it + if (f.isReady()) { + return; + } + f = f.via(e).then([](T&& t) { return std::move(t); }); + while (!f.isReady()) { + e->drive(); + } + assert(f.isReady()); +} + +template +void waitViaImpl(SemiFuture& f, DrivableExecutor* e) { + // Set callback so to ensure that the via executor has something on it + // so that once the preceding future triggers this callback, drive will + // always have a callback to satisfy it + if (f.isReady()) { + return; + } + f = std::move(f).via(e).then([](T&& t) { return std::move(t); }); while (!f.isReady()) { e->drive(); } + assert(f.isReady()); +} + +} // namespace detail +} // namespace futures + +template +SemiFuture& SemiFuture::wait() & { + futures::detail::waitImpl(*this); + return *this; +} + +template +SemiFuture&& SemiFuture::wait() && { + futures::detail::waitImpl(*this); + return std::move(*this); +} + +template +SemiFuture& SemiFuture::wait(Duration dur) & { + futures::detail::waitImpl(*this, dur); + return *this; } -} // detail +template +SemiFuture&& SemiFuture::wait(Duration dur) && { + futures::detail::waitImpl(*this, dur); + return std::move(*this); +} + +template +SemiFuture& SemiFuture::waitVia(DrivableExecutor* e) & { + futures::detail::waitViaImpl(*this, e); + return *this; +} + +template +SemiFuture&& SemiFuture::waitVia(DrivableExecutor* e) && { + futures::detail::waitViaImpl(*this, e); + return std::move(*this); +} + +template +T SemiFuture::get() && { + return std::move(wait()).value(); +} + +template +T SemiFuture::get(Duration dur) && { + wait(dur); + if (this->isReady()) { + return std::move(this->value()); + } else { + throwTimedOut(); + } +} + +template +Try SemiFuture::getTry() && { + return std::move(wait()).result(); +} + +template +Try SemiFuture::getTry(Duration dur) && { + wait(dur); + if (this->isReady()) { + return std::move(this->result()); + } else { + throwTimedOut(); + } +} + +template +T SemiFuture::getVia(DrivableExecutor* e) && { + return std::move(waitVia(e)).value(); +} + +template +Try SemiFuture::getTryVia(DrivableExecutor* e) && { + return std::move(waitVia(e)).result(); +} template Future& Future::wait() & { - detail::waitImpl(*this); + futures::detail::waitImpl(*this); return *this; } template Future&& Future::wait() && { - detail::waitImpl(*this); + futures::detail::waitImpl(*this); return std::move(*this); } template Future& Future::wait(Duration dur) & { - detail::waitImpl(*this, dur); + futures::detail::waitImpl(*this, dur); return *this; } template Future&& Future::wait(Duration dur) && { - detail::waitImpl(*this, dur); + futures::detail::waitImpl(*this, dur); return std::move(*this); } template Future& Future::waitVia(DrivableExecutor* e) & { - detail::waitViaImpl(*this, e); + futures::detail::waitViaImpl(*this, e); return *this; } template Future&& Future::waitVia(DrivableExecutor* e) && { - detail::waitViaImpl(*this, e); + futures::detail::waitViaImpl(*this, e); return std::move(*this); } @@ -896,29 +1541,19 @@ T Future::get() { return std::move(wait().value()); } -template <> -inline void Future::get() { - wait().value(); -} - template T Future::get(Duration dur) { wait(dur); - if (isReady()) { - return std::move(value()); + if (this->isReady()) { + return std::move(this->value()); } else { - throw TimedOut(); + throwTimedOut(); } } -template <> -inline void Future::get(Duration dur) { - wait(dur); - if (isReady()) { - return; - } else { - throw TimedOut(); - } +template +Try& Future::getTry() { + return result(); } template @@ -926,32 +1561,28 @@ T Future::getVia(DrivableExecutor* e) { return std::move(waitVia(e).value()); } -template <> -inline void Future::getVia(DrivableExecutor* e) { - waitVia(e).value(); +template +Try& Future::getTryVia(DrivableExecutor* e) { + return waitVia(e).getTry(); } +namespace futures { namespace detail { - template - struct TryEquals { - static bool equals(const Try& t1, const Try& t2) { - return t1.value() == t2.value(); - } - }; - - template <> - struct TryEquals { - static bool equals(const Try& t1, const Try& t2) { - return true; - } - }; -} +template +struct TryEquals { + static bool equals(const Try& t1, const Try& t2) { + return t1.value() == t2.value(); + } +}; +} // namespace detail +} // namespace futures template Future Future::willEqual(Future& f) { return collectAll(*this, f).then([](const std::tuple, Try>& t) { if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) { - return detail::TryEquals::equals(std::get<0>(t), std::get<1>(t)); + return futures::detail::TryEquals::equals( + std::get<0>(t), std::get<1>(t)); } else { return false; } @@ -960,62 +1591,60 @@ Future Future::willEqual(Future& f) { template template -Future Future::filter(F predicate) { - auto p = folly::makeMoveWrapper(std::move(predicate)); - return this->then([p](T val) { +Future Future::filter(F&& predicate) { + return this->then([p = std::forward(predicate)](T val) { T const& valConstRef = val; - if (!(*p)(valConstRef)) { - throw PredicateDoesNotObtain(); + if (!p(valConstRef)) { + throwPredicateDoesNotObtain(); } return val; }); } -namespace futures { - namespace { - template - Future chainHelper(Future f) { - return f; - } - - template - Future chainHelper(F f, Fn fn, Callbacks... fns) { - return chainHelper(f.then(fn), fns...); - } +template +inline Future when(bool p, F&& thunk) { + return p ? std::forward(thunk)().unit() : makeFuture(); +} + +template +Future whileDo(P&& predicate, F&& thunk) { + if (predicate()) { + auto future = thunk(); + return future.then([ + predicate = std::forward

(predicate), + thunk = std::forward(thunk) + ]() mutable { + return whileDo(std::forward

(predicate), std::forward(thunk)); + }); } + return makeFuture(); +} - template - std::function(Try)> - chain(Callbacks... fns) { - MoveWrapper> pw; - MoveWrapper> fw(chainHelper(pw->getFuture(), fns...)); - return [=](Try t) mutable { - pw->setTry(std::move(t)); - return std::move(*fw); - }; - } +template +Future times(const int n, F&& thunk) { + return folly::whileDo( + [ n, count = std::make_unique>(0) ]() mutable { + return count->fetch_add(1) < n; + }, + std::forward(thunk)); +} - template - std::vector> map(It first, It last, F func) { - std::vector> results; - for (auto it = first; it != last; it++) { - results.push_back(it->then(func)); - } - return results; +namespace futures { +template +std::vector> map(It first, It last, F func) { + std::vector> results; + for (auto it = first; it != last; it++) { + results.push_back(it->then(func)); } + return results; } +} // namespace futures // Instantiate the most common Future types to save compile time -extern template class Future; +extern template class Future; extern template class Future; extern template class Future; extern template class Future; extern template class Future; extern template class Future; - } // namespace folly - -// I haven't included a Future specialization because I don't forsee us -// using it, however it is not difficult to add when needed. Refer to -// Future for guidance. std::future and boost::future code would also be -// instructive.