X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2FFuture-inl.h;h=091357ee5b23a796c14631cd2fcf001047b7e704;hb=a1614feea3f3c0beb75fb2dc43ec45b3e5d57223;hp=a6f35c26643fdfbbc9dacffdca5d3dd7c179d464;hpb=d8bc4210d2feacbe01a24cd4fe2ad38a9c966dc8;p=folly.git diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index a6f35c26..091357ee 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2015 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,61 +16,61 @@ #pragma once +#include +#include #include +#include #include - #include +#include +#include +#include #include #include +#if defined(__ANDROID__) || defined(__APPLE__) +#define FOLLY_FUTURE_USING_FIBER 0 +#else +#define FOLLY_FUTURE_USING_FIBER 1 +#include +#endif + namespace folly { class Timekeeper; namespace detail { - Timekeeper* getTimekeeperSingleton(); +#if FOLLY_FUTURE_USING_FIBER +typedef folly::fibers::Baton FutureBatonType; +#else +typedef folly::Baton<> FutureBatonType; +#endif +} + +namespace detail { + std::shared_ptr getTimekeeperSingleton(); } template -Future::Future(Future&& other) noexcept : core_(nullptr) { - *this = std::move(other); +Future::Future(Future&& other) noexcept : core_(other.core_) { + other.core_ = nullptr; } template -Future& Future::operator=(Future&& other) { +Future& Future::operator=(Future&& other) noexcept { std::swap(core_, other.core_); return *this; } template -template -Future::Future( - const typename std::enable_if::value, F>::type& val) - : core_(nullptr) { - Promise p; - p.setValue(val); - *this = p.getFuture(); -} +template +Future::Future(T2&& val) + : core_(new detail::Core(Try(std::forward(val)))) {} template -template -Future::Future( - typename std::enable_if::value, F>::type&& val) - : core_(nullptr) { - Promise p; - p.setValue(std::forward(val)); - *this = p.getFuture(); -} - -template <> -template ::value, int>::type> -Future::Future() : core_(nullptr) { - Promise p; - p.setValue(); - *this = p.getFuture(); -} - +template +Future::Future() + : core_(new detail::Core(Try(T()))) {} template Future::~Future() { @@ -98,6 +98,20 @@ void Future::setCallback_(F&& func) { core_->setCallback(std::move(func)); } +// unwrap + +template +template +typename std::enable_if::value, + Future::Inner>>::type +Future::unwrap() { + return then([](Future::Inner> internal_future) { + return internal_future; + }); +} + +// then + // Variant: returns a value // e.g. f.then([](Try&& t){ return t.value(); }); template @@ -111,10 +125,12 @@ Future::thenImplementation(F func, detail::argResult) { // wrap these so we can move them into the lambda folly::MoveWrapper> p; + p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); folly::MoveWrapper funcm(std::forward(func)); // grab the Future now before we lose our handle on the Promise auto f = p->getFuture(); + f.core_->setExecutorNoLock(getExecutor()); /* This is a bit tricky. @@ -153,7 +169,7 @@ Future::thenImplementation(F func, detail::argResult) { if (!isTry && t.hasException()) { p->setException(std::move(t.exception())); } else { - p->fulfil([&]() { + p->setWith([&]() { return (*funcm)(t.template get()...); }); } @@ -175,10 +191,12 @@ Future::thenImplementation(F func, detail::argResult) { // wrap these so we can move them into the lambda folly::MoveWrapper> p; + p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); folly::MoveWrapper funcm(std::forward(func)); // grab the Future now before we lose our handle on the Promise auto f = p->getFuture(); + f.core_->setExecutorNoLock(getExecutor()); setCallback_( [p, funcm](Try&& t) mutable { @@ -189,9 +207,11 @@ Future::thenImplementation(F func, detail::argResult) { auto f2 = (*funcm)(t.template get()...); // that didn't throw, now we can steal p f2.setCallback_([p](Try&& b) mutable { - p->fulfilTry(std::move(b)); + p->setTry(std::move(b)); }); } catch (const std::exception& e) { + p->setException(exception_wrapper(std::current_exception(), e)); + } catch (...) { p->setException(exception_wrapper(std::current_exception())); } } @@ -213,14 +233,27 @@ Future::then(R(Caller::*func)(Args...), Caller *instance) { } template -Future Future::then() { - return then([] (Try&& t) {}); +template +auto Future::then(Executor* x, Arg&& arg, Args&&... args) + -> decltype(this->then(std::forward(arg), + std::forward(args)...)) +{ + auto oldX = getExecutor(); + setExecutor(x); + return this->then(std::forward(arg), std::forward(args)...). + via(oldX); +} + +template +Future Future::then() { + return then([] () {}); } // onError where the callback returns T template template typename std::enable_if< + !detail::callableWith::value && !detail::Extract::ReturnsFuture::value, Future>::type Future::onError(F&& func) { @@ -230,16 +263,17 @@ Future::onError(F&& func) { "Return type of onError callback must be T or Future"); Promise p; + p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); auto f = p.getFuture(); auto pm = folly::makeMoveWrapper(std::move(p)); auto funcm = folly::makeMoveWrapper(std::move(func)); setCallback_([pm, funcm](Try&& t) mutable { if (!t.template withException([&] (Exn& e) { - pm->fulfil([&]{ + pm->setWith([&]{ return (*funcm)(e); }); })) { - pm->fulfilTry(std::move(t)); + pm->setTry(std::move(t)); } }); @@ -250,6 +284,7 @@ Future::onError(F&& func) { template template typename std::enable_if< + !detail::callableWith::value && detail::Extract::ReturnsFuture::value, Future>::type Future::onError(F&& func) { @@ -267,7 +302,7 @@ Future::onError(F&& func) { try { auto f2 = (*funcm)(e); f2.setCallback_([pm](Try&& t2) mutable { - pm->fulfilTry(std::move(t2)); + pm->setTry(std::move(t2)); }); } catch (const std::exception& e2) { pm->setException(exception_wrapper(std::current_exception(), e2)); @@ -275,13 +310,23 @@ Future::onError(F&& func) { pm->setException(exception_wrapper(std::current_exception())); } })) { - pm->fulfilTry(std::move(t)); + pm->setTry(std::move(t)); } }); return f; } +template +template +Future Future::ensure(F func) { + MoveWrapper funcw(std::move(func)); + return this->then([funcw](Try&& t) mutable { + (*funcw)(); + return makeFuture(std::move(t)); + }); +} + template template Future Future::onTimeout(Duration dur, F&& func, Timekeeper* tk) { @@ -290,6 +335,70 @@ Future Future::onTimeout(Duration dur, F&& func, Timekeeper* tk) { .onError([funcw](TimedOut const&) { return (*funcw)(); }); } +template +template +typename std::enable_if< + detail::callableWith::value && + detail::Extract::ReturnsFuture::value, + Future>::type +Future::onError(F&& func) { + static_assert( + std::is_same::Return, Future>::value, + "Return type of onError callback must be T or Future"); + + Promise p; + auto f = p.getFuture(); + auto pm = folly::makeMoveWrapper(std::move(p)); + auto funcm = folly::makeMoveWrapper(std::move(func)); + setCallback_([pm, funcm](Try t) mutable { + if (t.hasException()) { + try { + auto f2 = (*funcm)(std::move(t.exception())); + f2.setCallback_([pm](Try t2) mutable { + pm->setTry(std::move(t2)); + }); + } catch (const std::exception& e2) { + pm->setException(exception_wrapper(std::current_exception(), e2)); + } catch (...) { + pm->setException(exception_wrapper(std::current_exception())); + } + } else { + pm->setTry(std::move(t)); + } + }); + + return f; +} + +// onError(exception_wrapper) that returns T +template +template +typename std::enable_if< + detail::callableWith::value && + !detail::Extract::ReturnsFuture::value, + Future>::type +Future::onError(F&& func) { + static_assert( + std::is_same::Return, Future>::value, + "Return type of onError callback must be T or Future"); + + Promise p; + auto f = p.getFuture(); + auto pm = folly::makeMoveWrapper(std::move(p)); + auto funcm = folly::makeMoveWrapper(std::move(func)); + setCallback_([pm, funcm](Try t) mutable { + if (t.hasException()) { + pm->setWith([&]{ + return (*funcm)(std::move(t.exception())); + }); + } else { + pm->setTry(std::move(t)); + } + }); + + return f; +} + template typename std::add_lvalue_reference::type Future::value() { throwIfInvalid(); @@ -312,25 +421,40 @@ Try& Future::getTry() { } template -template -inline Future Future::via(Executor* executor) && { +Optional> Future::poll() { + Optional> o; + if (core_->ready()) { + o = std::move(core_->getTry()); + } + return o; +} + +template +inline Future Future::via(Executor* executor, int8_t priority) && { throwIfInvalid(); - this->deactivate(); - core_->setExecutor(executor); + setExecutor(executor, priority); return std::move(*this); } template -template -inline Future Future::via(Executor* executor) & { +inline Future Future::via(Executor* executor, int8_t priority) & { throwIfInvalid(); MoveWrapper> p; auto f = p->getFuture(); - then([p](Try&& t) mutable { p->fulfilTry(std::move(t)); }); - return std::move(f).via(executor); + then([p](Try&& t) mutable { p->setTry(std::move(t)); }); + return std::move(f).via(executor, priority); +} + + +template +auto via(Executor* x, Func func) + -> Future::Inner> +{ + // TODO make this actually more performant. :-P #7260175 + return via(x).then(func); } template @@ -339,6 +463,16 @@ bool Future::isReady() const { return core_->ready(); } +template +bool Future::hasValue() { + return getTry().hasValue(); +} + +template +bool Future::hasException() { + return getTry().hasException(); +} + template void Future::raise(exception_wrapper exception) { core_->raise(std::move(exception)); @@ -348,296 +482,416 @@ void Future::raise(exception_wrapper exception) { template Future::type> makeFuture(T&& t) { - Promise::type> p; - p.setValue(std::forward(t)); - return p.getFuture(); + return makeFuture(Try::type>(std::forward(t))); } inline // for multiple translation units -Future makeFuture() { - Promise p; - p.setValue(); - return p.getFuture(); +Future makeFuture() { + return makeFuture(Unit{}); } +// makeFutureWith(Future()) -> Future template -auto makeFutureTry( - F&& func, - typename std::enable_if::value, bool>::type sdf) - -> Future { - Promise p; - p.fulfil( - [&func]() { - return (func)(); - }); - return p.getFuture(); +typename std::enable_if::type>::value, + typename std::result_of::type>::type +makeFutureWith(F&& func) { + using InnerType = + typename isFuture::type>::Inner; + try { + return func(); + } catch (std::exception& e) { + return makeFuture( + exception_wrapper(std::current_exception(), e)); + } catch (...) { + return makeFuture(exception_wrapper(std::current_exception())); + } } +// makeFutureWith(T()) -> Future +// makeFutureWith(void()) -> Future template -auto makeFutureTry(F const& func) -> Future { - F copy = func; - return makeFutureTry(std::move(copy)); +typename std::enable_if< + !(isFuture::type>::value), + Future::type>::type>>::type +makeFutureWith(F&& func) { + using LiftedResult = + typename Unit::Lift::type>::type; + return makeFuture(makeTryWith([&func]() mutable { + return func(); + })); } template Future makeFuture(std::exception_ptr const& e) { - Promise p; - p.setException(e); - return p.getFuture(); + return makeFuture(Try(e)); } template Future makeFuture(exception_wrapper ew) { - Promise p; - p.setException(std::move(ew)); - return p.getFuture(); + return makeFuture(Try(std::move(ew))); } template typename std::enable_if::value, Future>::type makeFuture(E const& e) { - Promise p; - p.setException(make_exception_wrapper(e)); - return p.getFuture(); + return makeFuture(Try(make_exception_wrapper(e))); } template Future makeFuture(Try&& t) { - Promise::type> p; - p.fulfilTry(std::move(t)); - return p.getFuture(); + return Future(new detail::Core(std::move(t))); } -template <> -inline Future makeFuture(Try&& t) { - if (t.hasException()) { - return makeFuture(std::move(t.exception())); - } else { - return makeFuture(); - } +// via +Future via(Executor* executor, int8_t priority) { + return makeFuture().via(executor, priority); } -// via -template -Future via(Executor* executor) { - return makeFuture().via(executor); +// mapSetCallback calls func(i, Try) when every future completes + +template +void mapSetCallback(InputIterator first, InputIterator last, F func) { + for (size_t i = 0; first != last; ++first, ++i) { + first->setCallback_([func, i](Try&& t) { + func(i, std::move(t)); + }); + } } -// when (variadic) +// collectAll (variadic) template -typename detail::VariadicContext< +typename detail::CollectAllVariadicContext< typename std::decay::type::value_type...>::type -whenAll(Fs&&... fs) -{ - auto ctx = - new detail::VariadicContext::type::value_type...>(); - ctx->total = sizeof...(fs); - auto f_saved = ctx->p.getFuture(); - detail::whenAllVariadicHelper(ctx, - std::forward::type>(fs)...); - return f_saved; +collectAll(Fs&&... fs) { + auto ctx = std::make_shared::type::value_type...>>(); + detail::collectVariadicHelper( + ctx, std::forward::type>(fs)...); + return ctx->p.getFuture(); } -// when (iterator) +// collectAll (iterator) template Future< std::vector< Try::value_type::value_type>>> -whenAll(InputIterator first, InputIterator last) -{ +collectAll(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - if (first >= last) { - return makeFuture(std::vector>()); - } - size_t n = std::distance(first, last); + struct CollectAllContext { + CollectAllContext(int n) : results(n) {} + ~CollectAllContext() { + p.setValue(std::move(results)); + } + Promise>> p; + std::vector> results; + }; - auto ctx = new detail::WhenAllContext(); + auto ctx = std::make_shared(std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + ctx->results[i] = std::move(t); + }); + return ctx->p.getFuture(); +} - ctx->results.resize(n); +// collect (iterator) - auto f_saved = ctx->p.getFuture(); +namespace detail { - for (size_t i = 0; first != last; ++first, ++i) { - assert(i < n); - auto& f = *first; - f.setCallback_([ctx, i, n](Try&& t) { - ctx->results[i] = std::move(t); - if (++ctx->count == n) { - ctx->p.setValue(std::move(ctx->results)); - delete ctx; - } - }); +template +struct CollectContext { + struct Nothing { + explicit Nothing(int /* n */) {} + }; + + using Result = typename std::conditional< + std::is_void::value, + void, + std::vector>::type; + + using InternalResult = typename std::conditional< + std::is_void::value, + Nothing, + std::vector>>::type; + + explicit CollectContext(int n) : result(n) {} + ~CollectContext() { + if (!threw.exchange(true)) { + // map Optional -> T + std::vector finalResult; + finalResult.reserve(result.size()); + std::transform(result.begin(), result.end(), + std::back_inserter(finalResult), + [](Optional& o) { return std::move(o.value()); }); + p.setValue(std::move(finalResult)); + } + } + inline void setPartialResult(size_t i, Try& t) { + result[i] = std::move(t.value()); } + Promise p; + InternalResult result; + std::atomic threw {false}; +}; + +} + +template +Future::value_type::value_type>::Result> +collect(InputIterator first, InputIterator last) { + typedef + typename std::iterator_traits::value_type::value_type T; - return f_saved; + auto ctx = std::make_shared>( + std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (t.hasException()) { + if (!ctx->threw.exchange(true)) { + ctx->p.setException(std::move(t.exception())); + } + } else if (!ctx->threw) { + ctx->setPartialResult(i, t); + } + }); + return ctx->p.getFuture(); +} + +// collect (variadic) + +template +typename detail::CollectVariadicContext< + typename std::decay::type::value_type...>::type +collect(Fs&&... fs) { + auto ctx = std::make_shared::type::value_type...>>(); + detail::collectVariadicHelper( + ctx, std::forward::type>(fs)...); + return ctx->p.getFuture(); } +// collectAny (iterator) + template Future< std::pair::value_type::value_type> > > -whenAny(InputIterator first, InputIterator last) { + std::iterator_traits::value_type::value_type>>> +collectAny(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - auto ctx = new detail::WhenAnyContext(std::distance(first, last)); - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; first++, i++) { - auto& f = *first; - f.setCallback_([i, ctx](Try&& t) { - if (!ctx->done.exchange(true)) { - ctx->p.setValue(std::make_pair(i, std::move(t))); - } - ctx->decref(); - }); - } + struct CollectAnyContext { + CollectAnyContext() {}; + Promise>> p; + std::atomic done {false}; + }; - return f_saved; + auto ctx = std::make_shared(); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (!ctx->done.exchange(true)) { + ctx->p.setValue(std::make_pair(i, std::move(t))); + } + }); + return ctx->p.getFuture(); } +// collectN (iterator) + template Future::value_type::value_type>>>> -whenN(InputIterator first, InputIterator last, size_t n) { +collectN(InputIterator first, InputIterator last, size_t n) { typedef typename std::iterator_traits::value_type::value_type T; typedef std::vector>> V; - struct ctx_t { + struct CollectNContext { V v; - size_t completed; + std::atomic completed = {0}; Promise p; }; - auto ctx = std::make_shared(); - ctx->completed = 0; - - // for each completed Future, increase count and add to vector, until we - // have n completed futures at which point we fulfil our Promise with the - // vector - auto it = first; - size_t i = 0; - while (it != last) { - it->then([ctx, n, i](Try&& t) { - auto& v = ctx->v; + auto ctx = std::make_shared(); + + if (size_t(std::distance(first, last)) < n) { + ctx->p.setException(std::runtime_error("Not enough futures")); + } else { + // for each completed Future, increase count and add to vector, until we + // have n completed futures at which point we fulfil our Promise with the + // vector + mapSetCallback(first, last, [ctx, n](size_t i, Try&& t) { auto c = ++ctx->completed; if (c <= n) { assert(ctx->v.size() < n); - v.push_back(std::make_pair(i, std::move(t))); + ctx->v.emplace_back(i, std::move(t)); if (c == n) { - ctx->p.fulfilTry(Try(std::move(v))); + ctx->p.setTry(Try(std::move(ctx->v))); } } }); - - it++; - i++; - } - - if (i < n) { - ctx->p.setException(std::runtime_error("Not enough futures")); } return ctx->p.getFuture(); } -namespace { - template - void getWaitHelper(Future* f) { - // If we already have a value do the cheap thing - if (f->isReady()) { - return; - } +// reduce (iterator) + +template +Future reduce(It first, It last, T&& initial, F&& func) { + if (first == last) { + return makeFuture(std::move(initial)); + } + + typedef typename std::iterator_traits::value_type::value_type ItT; + typedef typename std::conditional< + detail::callableWith&&>::value, Try, ItT>::type Arg; + typedef isTry IsTry; - folly::Baton<> baton; - f->then([&](Try const&) { - baton.post(); + folly::MoveWrapper minitial(std::move(initial)); + auto sfunc = std::make_shared(std::move(func)); + + auto f = first->then([minitial, sfunc](Try& head) mutable { + return (*sfunc)(std::move(*minitial), + head.template get()); + }); + + for (++first; first != last; ++first) { + f = collectAll(f, *first).then([sfunc](std::tuple, Try>& t) { + return (*sfunc)(std::move(std::get<0>(t).value()), + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + std::get<1>(t).template get()); }); - baton.wait(); } - template - Future getWaitTimeoutHelper(Future* f, Duration dur) { - // TODO make and use variadic whenAny #5877971 - Promise p; - auto token = std::make_shared>(); - folly::Baton<> baton; - - folly::detail::getTimekeeperSingleton()->after(dur) - .then([&,token](Try const& t) { - if (token->exchange(true) == false) { - if (t.hasException()) { - p.setException(std::move(t.exception())); - } else { - p.setException(TimedOut()); - } - baton.post(); - } - }); + return f; +} - f->then([&, token](Try&& t) { - if (token->exchange(true) == false) { - p.fulfilTry(std::move(t)); - baton.post(); +// window (collection) + +template +std::vector> +window(Collection input, F func, size_t n) { + struct WindowContext { + WindowContext(Collection&& i, F&& fn) + : input_(std::move(i)), promises_(input_.size()), + func_(std::move(fn)) + {} + std::atomic i_ {0}; + Collection input_; + std::vector> promises_; + F func_; + + static inline void spawn(const std::shared_ptr& ctx) { + size_t i = ctx->i_++; + if (i < ctx->input_.size()) { + // Using setCallback_ directly since we don't need the Future + ctx->func_(std::move(ctx->input_[i])).setCallback_( + // ctx is captured by value + [ctx, i](Try&& t) { + ctx->promises_[i].setTry(std::move(t)); + // Chain another future onto this one + spawn(std::move(ctx)); + }); } - }); + } + }; + + auto max = std::min(n, input.size()); - baton.wait(); - return p.getFuture(); + auto ctx = std::make_shared( + std::move(input), std::move(func)); + + for (size_t i = 0; i < max; ++i) { + // Start the first n Futures + WindowContext::spawn(ctx); } -} -template -T Future::get() { - getWaitHelper(this); + std::vector> futures; + futures.reserve(ctx->promises_.size()); + for (auto& promise : ctx->promises_) { + futures.emplace_back(promise.getFuture()); + } - // Big assumption here: the then() call above, since it doesn't move out - // the value, leaves us with a value to return here. This would be a big - // no-no in user code, but I'm invoking internal developer privilege. This - // is slightly more efficient (save a move()) especially if there's an - // exception (save a throw). - return std::move(value()); + return futures; } -template <> -inline void Future::get() { - getWaitHelper(this); - value(); -} +// reduce template -T Future::get(Duration dur) { - return std::move(getWaitTimeoutHelper(this, dur).value()); +template +Future Future::reduce(I&& initial, F&& func) { + folly::MoveWrapper minitial(std::move(initial)); + folly::MoveWrapper mfunc(std::move(func)); + return then([minitial, mfunc](T& vals) mutable { + auto ret = std::move(*minitial); + for (auto& val : vals) { + ret = (*mfunc)(std::move(ret), std::move(val)); + } + return ret; + }); } -template <> -inline void Future::get(Duration dur) { - getWaitTimeoutHelper(this, dur).value(); -} +// unorderedReduce (iterator) -template -T Future::getVia(DrivableExecutor* e) { - while (!isReady()) { - e->drive(); +template +Future unorderedReduce(It first, It last, T initial, F func) { + if (first == last) { + return makeFuture(std::move(initial)); } - return std::move(value()); -} -template <> -inline void Future::getVia(DrivableExecutor* e) { - while (!isReady()) { - e->drive(); - } - value(); + typedef isTry IsTry; + + struct UnorderedReduceContext { + UnorderedReduceContext(T&& memo, F&& fn, size_t n) + : lock_(), memo_(makeFuture(std::move(memo))), + func_(std::move(fn)), numThens_(0), numFutures_(n), promise_() + {}; + folly::MicroSpinLock lock_; // protects memo_ and numThens_ + Future memo_; + F func_; + size_t numThens_; // how many Futures completed and called .then() + size_t numFutures_; // how many Futures in total + Promise promise_; + }; + + auto ctx = std::make_shared( + std::move(initial), std::move(func), std::distance(first, last)); + + mapSetCallback( + first, + last, + [ctx](size_t /* i */, Try&& t) { + folly::MoveWrapper> mt(std::move(t)); + // Futures can be completed in any order, simultaneously. + // To make this non-blocking, we create a new Future chain in + // the order of completion to reduce the values. + // The spinlock just protects chaining a new Future, not actually + // executing the reduce, which should be really fast. + folly::MSLGuard lock(ctx->lock_); + ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable { + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + return ctx->func_(std::move(v), + mt->template get()); + }); + if (++ctx->numThens_ == ctx->numFutures_) { + // After reducing the value of the last Future, fulfill the Promise + ctx->memo_.setCallback_( + [ctx](Try&& t2) { ctx->promise_.setValue(std::move(t2)); }); + } + }); + + return ctx->promise_.getFuture(); } +// within + template Future Future::within(Duration dur, Timekeeper* tk) { return within(dur, TimedOut(), tk); @@ -648,41 +902,49 @@ template Future Future::within(Duration dur, E e, Timekeeper* tk) { struct Context { - Context(E ex) : exception(std::move(ex)), promise(), token(false) {} + Context(E ex) : exception(std::move(ex)), promise() {} E exception; + Future thisFuture; Promise promise; - std::atomic token; + std::atomic token {false}; }; - auto ctx = std::make_shared(std::move(e)); + std::shared_ptr tks; if (!tk) { - tk = folly::detail::getTimekeeperSingleton(); + tks = folly::detail::getTimekeeperSingleton(); + tk = DCHECK_NOTNULL(tks.get()); } - tk->after(dur) - .then([ctx](Try const& t) { - if (ctx->token.exchange(true) == false) { - if (t.hasException()) { - ctx->promise.setException(std::move(t.exception())); - } else { - ctx->promise.setException(std::move(ctx->exception)); - } - } - }); + auto ctx = std::make_shared(std::move(e)); - this->then([ctx](Try&& t) { + ctx->thisFuture = this->then([ctx](Try&& t) mutable { + // TODO: "this" completed first, cancel "after" if (ctx->token.exchange(true) == false) { - ctx->promise.fulfilTry(std::move(t)); + ctx->promise.setTry(std::move(t)); } }); - return ctx->promise.getFuture(); + tk->after(dur).then([ctx](Try const& t) mutable { + // "after" completed first, cancel "this" + ctx->thisFuture.raise(TimedOut()); + if (ctx->token.exchange(true) == false) { + if (t.hasException()) { + ctx->promise.setException(std::move(t.exception())); + } else { + ctx->promise.setException(std::move(ctx->exception)); + } + } + }); + + return ctx->promise.getFuture().via(getExecutor()); } +// delayed + template Future Future::delayed(Duration dur, Timekeeper* tk) { - return whenAll(*this, futures::sleep(dur, tk)) - .then([](std::tuple, Try> tup) { + return collectAll(*this, futures::sleep(dur, tk)) + .then([](std::tuple, Try> tup) { Try& t = std::get<0>(tup); return makeFuture(std::move(t)); }); @@ -692,34 +954,30 @@ namespace detail { template void waitImpl(Future& f) { - Baton<> baton; - f = f.then([&](Try t) { - baton.post(); - return makeFuture(std::move(t)); - }); + // short-circuit if there's nothing to do + if (f.isReady()) return; + + FutureBatonType baton; + f.setCallback_([&](const Try& /* t */) { baton.post(); }); baton.wait(); - // There's a race here between the return here and the actual finishing of - // the future. f is completed, but the setup may not have finished on done - // after the baton has posted. - while (!f.isReady()) { - std::this_thread::yield(); - } + assert(f.isReady()); } template void waitImpl(Future& f, Duration dur) { - auto baton = std::make_shared>(); - f = f.then([baton](Try t) { + // short-circuit if there's nothing to do + if (f.isReady()) return; + + folly::MoveWrapper> promise; + auto ret = promise->getFuture(); + auto baton = std::make_shared(); + f.setCallback_([baton, promise](Try&& t) mutable { + promise->setTry(std::move(t)); baton->post(); - return makeFuture(std::move(t)); }); - // Let's preserve the invariant that if we did not timeout (timed_wait returns - // true), then the returned Future is complete when it is returned to the - // caller. We need to wait out the race for that Future to complete. - if (baton->timed_wait(std::chrono::system_clock::now() + dur)) { - while (!f.isReady()) { - std::this_thread::yield(); - } + f = std::move(ret); + if (baton->timed_wait(dur)) { + assert(f.isReady()); } } @@ -768,39 +1026,332 @@ Future&& Future::waitVia(DrivableExecutor* e) && { return std::move(*this); } -namespace futures { +template +T Future::get() { + return std::move(wait().value()); +} - namespace { - template - Future chainHelper(F, Callbacks...); +template +T Future::get(Duration dur) { + wait(dur); + if (isReady()) { + return std::move(value()); + } else { + throw TimedOut(); + } +} + +template +T Future::getVia(DrivableExecutor* e) { + return std::move(waitVia(e).value()); +} - template - Future chainHelper(Future f) { - return f; +namespace detail { + template + struct TryEquals { + static bool equals(const Try& t1, const Try& t2) { + return t1.value() == t2.value(); } + }; +} + +template +Future Future::willEqual(Future& f) { + return collectAll(*this, f).then([](const std::tuple, Try>& t) { + if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) { + return detail::TryEquals::equals(std::get<0>(t), std::get<1>(t)); + } else { + return false; + } + }); +} - template - Future chainHelper(F f, Fn fn, Callbacks... fns) { - return chainHelper(f.then(fn), fns...); +template +template +Future Future::filter(F predicate) { + auto p = folly::makeMoveWrapper(std::move(predicate)); + return this->then([p](T val) { + T const& valConstRef = val; + if (!(*p)(valConstRef)) { + throw PredicateDoesNotObtain(); } + return val; + }); +} + +template +template +auto Future::thenMulti(Callback&& fn) + -> decltype(this->then(std::forward(fn))) { + // thenMulti with one callback is just a then + return then(std::forward(fn)); +} + +template +template +auto Future::thenMulti(Callback&& fn, Callbacks&&... fns) + -> decltype(this->then(std::forward(fn)). + thenMulti(std::forward(fns)...)) { + // thenMulti with two callbacks is just then(a).thenMulti(b, ...) + return then(std::forward(fn)). + thenMulti(std::forward(fns)...); +} + +template +template +auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn, + Callbacks&&... fns) + -> decltype(this->then(std::forward(fn)). + thenMulti(std::forward(fns)...)) { + // thenMultiExecutor with two callbacks is + // via(x).then(a).thenMulti(b, ...).via(oldX) + auto oldX = getExecutor(); + setExecutor(x); + return then(std::forward(fn)). + thenMulti(std::forward(fns)...).via(oldX); +} + +template +template +auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn) + -> decltype(this->then(std::forward(fn))) { + // thenMulti with one callback is just a then with an executor + return then(x, std::forward(fn)); +} + +template +inline Future when(bool p, F thunk) { + return p ? thunk().unit() : makeFuture(); +} + +template +Future whileDo(P predicate, F thunk) { + if (predicate()) { + return thunk().then([=] { + return whileDo(predicate, thunk); + }); } + return makeFuture(); +} - template - std::function(Try)> - chain(Callbacks... fns) { - MoveWrapper> pw; - MoveWrapper> fw(chainHelper(pw->getFuture(), fns...)); - return [=](Try t) mutable { - pw->fulfilTry(std::move(t)); - return std::move(*fw); - }; +template +Future times(const int n, F thunk) { + auto count = folly::makeMoveWrapper( + std::unique_ptr>(new std::atomic(0)) + ); + return folly::whileDo([=]() mutable { + return (*count)->fetch_add(1) < n; + }, thunk); +} + +namespace futures { + template + std::vector> map(It first, It last, F func) { + std::vector> results; + for (auto it = first; it != last; it++) { + results.push_back(it->then(func)); + } + return results; } +} +namespace futures { + +namespace detail { + +struct retrying_policy_raw_tag {}; +struct retrying_policy_fut_tag {}; + +template +struct retrying_policy_traits { + using ew = exception_wrapper; + FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator()); + template + using has_op = typename std::integral_constant::value || + has_op_call::value>; + using is_raw = has_op; + using is_fut = has_op>; + using tag = typename std::conditional< + is_raw::value, retrying_policy_raw_tag, typename std::conditional< + is_fut::value, retrying_policy_fut_tag, void>::type>::type; +}; + +template +typename std::result_of::type +retrying(size_t k, Policy&& p, FF&& ff) { + using F = typename std::result_of::type; + using T = typename F::value_type; + auto f = ff(k++); + auto pm = makeMoveWrapper(p); + auto ffm = makeMoveWrapper(ff); + return f.onError([=](exception_wrapper x) mutable { + auto q = (*pm)(k, x); + auto xm = makeMoveWrapper(std::move(x)); + return q.then([=](bool r) mutable { + return r + ? retrying(k, pm.move(), ffm.move()) + : makeFuture(xm.move()); + }); + }); } -} // namespace folly +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { + auto pm = makeMoveWrapper(std::move(p)); + auto q = [=](size_t k, exception_wrapper x) { + return makeFuture((*pm)(k, x)); + }; + return retrying(0, std::move(q), std::forward(ff)); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { + return retrying(0, std::forward(p), std::forward(ff)); +} + +// jittered exponential backoff, clamped to [backoff_min, backoff_max] +template +Duration retryingJitteredExponentialBackoffDur( + size_t n, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG& rng) { + using d = Duration; + auto dist = std::normal_distribution(0.0, jitter_param); + auto jitter = std::exp(dist(rng)); + auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); + return std::max(backoff_min, std::min(backoff_max, backoff)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p) { + auto pm = makeMoveWrapper(std::move(p)); + auto rngp = std::make_shared(std::move(rng)); + return [=](size_t n, const exception_wrapper& ex) mutable { + if (n == max_tries) { return makeFuture(false); } + return (*pm)(n, ex).then([=](bool v) { + if (!v) { return makeFuture(false); } + auto backoff = detail::retryingJitteredExponentialBackoffDur( + n, backoff_min, backoff_max, jitter_param, *rngp); + return futures::sleep(backoff).then([] { return true; }); + }); + }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p, + retrying_policy_raw_tag) { + auto pm = makeMoveWrapper(std::move(p)); + auto q = [=](size_t n, const exception_wrapper& e) { + return makeFuture((*pm)(n, e)); + }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(q)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p, + retrying_policy_fut_tag) { + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(p)); +} + +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retrying(std::forward(p), std::forward(ff), tag()); +} + +inline +std::function +retryingPolicyBasic( + size_t max_tries) { + return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(p), + tag()); +} + +inline +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param) { + auto p = [](size_t, const exception_wrapper&) { return true; }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + ThreadLocalPRNG(), + std::move(p)); +} -// I haven't included a Future specialization because I don't forsee us -// using it, however it is not difficult to add when needed. Refer to -// Future for guidance. std::future and boost::future code would also be -// instructive. +} + +// Instantiate the most common Future types to save compile time +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; + +} // namespace folly