X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2FFuture-inl.h;h=091357ee5b23a796c14631cd2fcf001047b7e704;hb=a1614feea3f3c0beb75fb2dc43ec45b3e5d57223;hp=b3ca83e64c81346429fa7bf3b8b37489c04a31f9;hpb=e4e2520ac491eb7d13cde8aea87dd8668b622da0;p=folly.git diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index b3ca83e6..091357ee 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -16,20 +16,39 @@ #pragma once +#include +#include #include +#include #include - -#include +#include #include +#include +#include #include #include +#if defined(__ANDROID__) || defined(__APPLE__) +#define FOLLY_FUTURE_USING_FIBER 0 +#else +#define FOLLY_FUTURE_USING_FIBER 1 +#include +#endif + namespace folly { class Timekeeper; namespace detail { - Timekeeper* getTimekeeperSingleton(); +#if FOLLY_FUTURE_USING_FIBER +typedef folly::fibers::Baton FutureBatonType; +#else +typedef folly::Baton<> FutureBatonType; +#endif +} + +namespace detail { + std::shared_ptr getTimekeeperSingleton(); } template @@ -44,25 +63,14 @@ Future& Future::operator=(Future&& other) noexcept { } template -template ::value, void*>::type> -Future::Future(T2&& val) : core_(nullptr) { - Promise p; - p.setValue(std::forward(val)); - *this = p.getFuture(); -} +template +Future::Future(T2&& val) + : core_(new detail::Core(Try(std::forward(val)))) {} template -template ::value, - int>::type> -Future::Future() : core_(nullptr) { - Promise p; - p.setValue(); - *this = p.getFuture(); -} - +template +Future::Future() + : core_(new detail::Core(Try(T()))) {} template Future::~Future() { @@ -117,13 +125,12 @@ Future::thenImplementation(F func, detail::argResult) { // wrap these so we can move them into the lambda folly::MoveWrapper> p; + p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); folly::MoveWrapper funcm(std::forward(func)); // grab the Future now before we lose our handle on the Promise auto f = p->getFuture(); - if (getExecutor()) { - f.setExecutor(getExecutor()); - } + f.core_->setExecutorNoLock(getExecutor()); /* This is a bit tricky. @@ -184,13 +191,12 @@ Future::thenImplementation(F func, detail::argResult) { // wrap these so we can move them into the lambda folly::MoveWrapper> p; + p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); folly::MoveWrapper funcm(std::forward(func)); // grab the Future now before we lose our handle on the Promise auto f = p->getFuture(); - if (getExecutor()) { - f.setExecutor(getExecutor()); - } + f.core_->setExecutorNoLock(getExecutor()); setCallback_( [p, funcm](Try&& t) mutable { @@ -239,8 +245,8 @@ auto Future::then(Executor* x, Arg&& arg, Args&&... args) } template -Future Future::then() { - return then([] (Try&& t) {}); +Future Future::then() { + return then([] () {}); } // onError where the callback returns T @@ -257,6 +263,7 @@ Future::onError(F&& func) { "Return type of onError callback must be T or Future"); Promise p; + p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); auto f = p.getFuture(); auto pm = folly::makeMoveWrapper(std::move(p)); auto funcm = folly::makeMoveWrapper(std::move(func)); @@ -314,7 +321,7 @@ template template Future Future::ensure(F func) { MoveWrapper funcw(std::move(func)); - return this->then([funcw](Try&& t) { + return this->then([funcw](Try&& t) mutable { (*funcw)(); return makeFuture(std::move(t)); }); @@ -423,22 +430,31 @@ Optional> Future::poll() { } template -inline Future Future::via(Executor* executor) && { +inline Future Future::via(Executor* executor, int8_t priority) && { throwIfInvalid(); - setExecutor(executor); + setExecutor(executor, priority); return std::move(*this); } template -inline Future Future::via(Executor* executor) & { +inline Future Future::via(Executor* executor, int8_t priority) & { throwIfInvalid(); MoveWrapper> p; auto f = p->getFuture(); then([p](Try&& t) mutable { p->setTry(std::move(t)); }); - return std::move(f).via(executor); + return std::move(f).via(executor, priority); +} + + +template +auto via(Executor* x, Func func) + -> Future::Inner> +{ + // TODO make this actually more performant. :-P #7260175 + return via(x).then(func); } template @@ -447,6 +463,16 @@ bool Future::isReady() const { return core_->ready(); } +template +bool Future::hasValue() { + return getTry().hasValue(); +} + +template +bool Future::hasException() { + return getTry().hasException(); +} + template void Future::raise(exception_wrapper exception) { core_->raise(std::move(exception)); @@ -456,97 +482,97 @@ void Future::raise(exception_wrapper exception) { template Future::type> makeFuture(T&& t) { - Promise::type> p; - p.setValue(std::forward(t)); - return p.getFuture(); + return makeFuture(Try::type>(std::forward(t))); } inline // for multiple translation units -Future makeFuture() { - Promise p; - p.setValue(); - return p.getFuture(); +Future makeFuture() { + return makeFuture(Unit{}); } +// makeFutureWith(Future()) -> Future template -auto makeFutureWith( - F&& func, - typename std::enable_if::value, bool>::type sdf) - -> Future { - Promise p; - p.setWith( - [&func]() { - return (func)(); - }); - return p.getFuture(); +typename std::enable_if::type>::value, + typename std::result_of::type>::type +makeFutureWith(F&& func) { + using InnerType = + typename isFuture::type>::Inner; + try { + return func(); + } catch (std::exception& e) { + return makeFuture( + exception_wrapper(std::current_exception(), e)); + } catch (...) { + return makeFuture(exception_wrapper(std::current_exception())); + } } +// makeFutureWith(T()) -> Future +// makeFutureWith(void()) -> Future template -auto makeFutureWith(F const& func) -> Future { - F copy = func; - return makeFutureWith(std::move(copy)); +typename std::enable_if< + !(isFuture::type>::value), + Future::type>::type>>::type +makeFutureWith(F&& func) { + using LiftedResult = + typename Unit::Lift::type>::type; + return makeFuture(makeTryWith([&func]() mutable { + return func(); + })); } template Future makeFuture(std::exception_ptr const& e) { - Promise p; - p.setException(e); - return p.getFuture(); + return makeFuture(Try(e)); } template Future makeFuture(exception_wrapper ew) { - Promise p; - p.setException(std::move(ew)); - return p.getFuture(); + return makeFuture(Try(std::move(ew))); } template typename std::enable_if::value, Future>::type makeFuture(E const& e) { - Promise p; - p.setException(make_exception_wrapper(e)); - return p.getFuture(); + return makeFuture(Try(make_exception_wrapper(e))); } template Future makeFuture(Try&& t) { - Promise::type> p; - p.setTry(std::move(t)); - return p.getFuture(); + return Future(new detail::Core(std::move(t))); } -template <> -inline Future makeFuture(Try&& t) { - if (t.hasException()) { - return makeFuture(std::move(t.exception())); - } else { - return makeFuture(); - } +// via +Future via(Executor* executor, int8_t priority) { + return makeFuture().via(executor, priority); } -// via -inline Future via(Executor* executor) { - return makeFuture().via(executor); +// mapSetCallback calls func(i, Try) when every future completes + +template +void mapSetCallback(InputIterator first, InputIterator last, F func) { + for (size_t i = 0; first != last; ++first, ++i) { + first->setCallback_([func, i](Try&& t) { + func(i, std::move(t)); + }); + } } -// when (variadic) +// collectAll (variadic) template -typename detail::VariadicContext< +typename detail::CollectAllVariadicContext< typename std::decay::type::value_type...>::type collectAll(Fs&&... fs) { - auto ctx = - new detail::VariadicContext::type::value_type...>(); - ctx->total = sizeof...(fs); - auto f_saved = ctx->p.getFuture(); - detail::collectAllVariadicHelper(ctx, - std::forward::type>(fs)...); - return f_saved; + auto ctx = std::make_shared::type::value_type...>>(); + detail::collectVariadicHelper( + ctx, std::forward::type>(fs)...); + return ctx->p.getFuture(); } -// when (iterator) +// collectAll (iterator) template Future< @@ -556,183 +582,127 @@ collectAll(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - if (first >= last) { - return makeFuture(std::vector>()); - } - size_t n = std::distance(first, last); - - auto ctx = new detail::WhenAllContext(); - - ctx->results.resize(n); - - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; ++first, ++i) { - assert(i < n); - auto& f = *first; - f.setCallback_([ctx, i, n](Try t) { - ctx->results[i] = std::move(t); - if (++ctx->count == n) { - ctx->p.setValue(std::move(ctx->results)); - delete ctx; - } - }); - } + struct CollectAllContext { + CollectAllContext(int n) : results(n) {} + ~CollectAllContext() { + p.setValue(std::move(results)); + } + Promise>> p; + std::vector> results; + }; - return f_saved; + auto ctx = std::make_shared(std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + ctx->results[i] = std::move(t); + }); + return ctx->p.getFuture(); } -namespace detail { - -template struct CollectContextHelper; - -template -struct CollectContextHelper::value>::type> { - static inline std::vector&& getResults(std::vector& results) { - return std::move(results); - } -}; +// collect (iterator) -template -struct CollectContextHelper::value>::type> { - static inline std::vector getResults(std::vector& results) { - std::vector finalResults; - finalResults.reserve(results.size()); - for (auto& opt : results) { - finalResults.push_back(std::move(opt.value())); - } - return finalResults; - } -}; +namespace detail { template struct CollectContext { + struct Nothing { + explicit Nothing(int /* n */) {} + }; - typedef typename std::conditional< - std::is_default_constructible::value, - T, - Optional - >::type VecT; - - explicit CollectContext(int n) : count(0), success_count(0), threw(false) { - results.resize(n); - } - - Promise> p; - std::vector results; - std::atomic count, success_count; - std::atomic_bool threw; - - typedef std::vector result_type; - - static inline Future> makeEmptyFuture() { - return makeFuture(std::vector()); - } - - inline void setValue() { - p.setValue(CollectContextHelper::getResults(results)); - } - - inline void addResult(int i, Try& t) { - results[i] = std::move(t.value()); - } -}; - -template <> -struct CollectContext { - - explicit CollectContext(int n) : count(0), success_count(0), threw(false) {} - - Promise p; - std::atomic count, success_count; - std::atomic_bool threw; - - typedef void result_type; - - static inline Future makeEmptyFuture() { - return makeFuture(); - } - - inline void setValue() { - p.setValue(); + using Result = typename std::conditional< + std::is_void::value, + void, + std::vector>::type; + + using InternalResult = typename std::conditional< + std::is_void::value, + Nothing, + std::vector>>::type; + + explicit CollectContext(int n) : result(n) {} + ~CollectContext() { + if (!threw.exchange(true)) { + // map Optional -> T + std::vector finalResult; + finalResult.reserve(result.size()); + std::transform(result.begin(), result.end(), + std::back_inserter(finalResult), + [](Optional& o) { return std::move(o.value()); }); + p.setValue(std::move(finalResult)); + } } - - inline void addResult(int i, Try& t) { - // do nothing + inline void setPartialResult(size_t i, Try& t) { + result[i] = std::move(t.value()); } + Promise p; + InternalResult result; + std::atomic threw {false}; }; -} // detail +} template Future::value_type::value_type ->::result_type> + typename std::iterator_traits::value_type::value_type>::Result> collect(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - if (first >= last) { - return detail::CollectContext::makeEmptyFuture(); - } - - size_t n = std::distance(first, last); - auto ctx = new detail::CollectContext(n); - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; ++first, ++i) { - assert(i < n); - auto& f = *first; - f.setCallback_([ctx, i, n](Try t) { - - if (t.hasException()) { - if (!ctx->threw.exchange(true)) { - ctx->p.setException(std::move(t.exception())); - } - } else if (!ctx->threw) { - ctx->addResult(i, t); - if (++ctx->success_count == n) { - ctx->setValue(); - } + auto ctx = std::make_shared>( + std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (t.hasException()) { + if (!ctx->threw.exchange(true)) { + ctx->p.setException(std::move(t.exception())); } + } else if (!ctx->threw) { + ctx->setPartialResult(i, t); + } + }); + return ctx->p.getFuture(); +} - if (++ctx->count == n) { - delete ctx; - } - }); - } +// collect (variadic) - return f_saved; +template +typename detail::CollectVariadicContext< + typename std::decay::type::value_type...>::type +collect(Fs&&... fs) { + auto ctx = std::make_shared::type::value_type...>>(); + detail::collectVariadicHelper( + ctx, std::forward::type>(fs)...); + return ctx->p.getFuture(); } +// collectAny (iterator) + template Future< std::pair::value_type::value_type> > > + std::iterator_traits::value_type::value_type>>> collectAny(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - auto ctx = new detail::WhenAnyContext(std::distance(first, last)); - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; first++, i++) { - auto& f = *first; - f.setCallback_([i, ctx](Try&& t) { - if (!ctx->done.exchange(true)) { - ctx->p.setValue(std::make_pair(i, std::move(t))); - } - ctx->decref(); - }); - } + struct CollectAnyContext { + CollectAnyContext() {}; + Promise>> p; + std::atomic done {false}; + }; - return f_saved; + auto ctx = std::make_shared(); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (!ctx->done.exchange(true)) { + ctx->p.setValue(std::make_pair(i, std::move(t))); + } + }); + return ctx->p.getFuture(); } +// collectN (iterator) + template Future::value_type::value_type>>>> @@ -741,43 +711,36 @@ collectN(InputIterator first, InputIterator last, size_t n) { std::iterator_traits::value_type::value_type T; typedef std::vector>> V; - struct ctx_t { + struct CollectNContext { V v; - size_t completed; + std::atomic completed = {0}; Promise p; }; - auto ctx = std::make_shared(); - ctx->completed = 0; - - // for each completed Future, increase count and add to vector, until we - // have n completed futures at which point we fulfill our Promise with the - // vector - auto it = first; - size_t i = 0; - while (it != last) { - it->then([ctx, n, i](Try&& t) { - auto& v = ctx->v; + auto ctx = std::make_shared(); + + if (size_t(std::distance(first, last)) < n) { + ctx->p.setException(std::runtime_error("Not enough futures")); + } else { + // for each completed Future, increase count and add to vector, until we + // have n completed futures at which point we fulfil our Promise with the + // vector + mapSetCallback(first, last, [ctx, n](size_t i, Try&& t) { auto c = ++ctx->completed; if (c <= n) { assert(ctx->v.size() < n); - v.push_back(std::make_pair(i, std::move(t))); + ctx->v.emplace_back(i, std::move(t)); if (c == n) { - ctx->p.setTry(Try(std::move(v))); + ctx->p.setTry(Try(std::move(ctx->v))); } } }); - - it++; - i++; - } - - if (i < n) { - ctx->p.setException(std::runtime_error("Not enough futures")); } return ctx->p.getFuture(); } +// reduce (iterator) + template Future reduce(It first, It last, T&& initial, F&& func) { if (first == last) { @@ -809,6 +772,57 @@ Future reduce(It first, It last, T&& initial, F&& func) { return f; } +// window (collection) + +template +std::vector> +window(Collection input, F func, size_t n) { + struct WindowContext { + WindowContext(Collection&& i, F&& fn) + : input_(std::move(i)), promises_(input_.size()), + func_(std::move(fn)) + {} + std::atomic i_ {0}; + Collection input_; + std::vector> promises_; + F func_; + + static inline void spawn(const std::shared_ptr& ctx) { + size_t i = ctx->i_++; + if (i < ctx->input_.size()) { + // Using setCallback_ directly since we don't need the Future + ctx->func_(std::move(ctx->input_[i])).setCallback_( + // ctx is captured by value + [ctx, i](Try&& t) { + ctx->promises_[i].setTry(std::move(t)); + // Chain another future onto this one + spawn(std::move(ctx)); + }); + } + } + }; + + auto max = std::min(n, input.size()); + + auto ctx = std::make_shared( + std::move(input), std::move(func)); + + for (size_t i = 0; i < max; ++i) { + // Start the first n Futures + WindowContext::spawn(ctx); + } + + std::vector> futures; + futures.reserve(ctx->promises_.size()); + for (auto& promise : ctx->promises_) { + futures.emplace_back(promise.getFuture()); + } + + return futures; +} + +// reduce + template template Future Future::reduce(I&& initial, F&& func) { @@ -823,6 +837,61 @@ Future Future::reduce(I&& initial, F&& func) { }); } +// unorderedReduce (iterator) + +template +Future unorderedReduce(It first, It last, T initial, F func) { + if (first == last) { + return makeFuture(std::move(initial)); + } + + typedef isTry IsTry; + + struct UnorderedReduceContext { + UnorderedReduceContext(T&& memo, F&& fn, size_t n) + : lock_(), memo_(makeFuture(std::move(memo))), + func_(std::move(fn)), numThens_(0), numFutures_(n), promise_() + {}; + folly::MicroSpinLock lock_; // protects memo_ and numThens_ + Future memo_; + F func_; + size_t numThens_; // how many Futures completed and called .then() + size_t numFutures_; // how many Futures in total + Promise promise_; + }; + + auto ctx = std::make_shared( + std::move(initial), std::move(func), std::distance(first, last)); + + mapSetCallback( + first, + last, + [ctx](size_t /* i */, Try&& t) { + folly::MoveWrapper> mt(std::move(t)); + // Futures can be completed in any order, simultaneously. + // To make this non-blocking, we create a new Future chain in + // the order of completion to reduce the values. + // The spinlock just protects chaining a new Future, not actually + // executing the reduce, which should be really fast. + folly::MSLGuard lock(ctx->lock_); + ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable { + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + return ctx->func_(std::move(v), + mt->template get()); + }); + if (++ctx->numThens_ == ctx->numFutures_) { + // After reducing the value of the last Future, fulfill the Promise + ctx->memo_.setCallback_( + [ctx](Try&& t2) { ctx->promise_.setValue(std::move(t2)); }); + } + }); + + return ctx->promise_.getFuture(); +} + +// within + template Future Future::within(Duration dur, Timekeeper* tk) { return within(dur, TimedOut(), tk); @@ -833,41 +902,49 @@ template Future Future::within(Duration dur, E e, Timekeeper* tk) { struct Context { - Context(E ex) : exception(std::move(ex)), promise(), token(false) {} + Context(E ex) : exception(std::move(ex)), promise() {} E exception; + Future thisFuture; Promise promise; - std::atomic token; + std::atomic token {false}; }; - auto ctx = std::make_shared(std::move(e)); + std::shared_ptr tks; if (!tk) { - tk = folly::detail::getTimekeeperSingleton(); + tks = folly::detail::getTimekeeperSingleton(); + tk = DCHECK_NOTNULL(tks.get()); } - tk->after(dur) - .then([ctx](Try const& t) { - if (ctx->token.exchange(true) == false) { - if (t.hasException()) { - ctx->promise.setException(std::move(t.exception())); - } else { - ctx->promise.setException(std::move(ctx->exception)); - } - } - }); + auto ctx = std::make_shared(std::move(e)); - this->then([ctx](Try&& t) { + ctx->thisFuture = this->then([ctx](Try&& t) mutable { + // TODO: "this" completed first, cancel "after" if (ctx->token.exchange(true) == false) { ctx->promise.setTry(std::move(t)); } }); - return ctx->promise.getFuture(); + tk->after(dur).then([ctx](Try const& t) mutable { + // "after" completed first, cancel "this" + ctx->thisFuture.raise(TimedOut()); + if (ctx->token.exchange(true) == false) { + if (t.hasException()) { + ctx->promise.setException(std::move(t.exception())); + } else { + ctx->promise.setException(std::move(ctx->exception)); + } + } + }); + + return ctx->promise.getFuture().via(getExecutor()); } +// delayed + template Future Future::delayed(Duration dur, Timekeeper* tk) { return collectAll(*this, futures::sleep(dur, tk)) - .then([](std::tuple, Try> tup) { + .then([](std::tuple, Try> tup) { Try& t = std::get<0>(tup); return makeFuture(std::move(t)); }); @@ -880,19 +957,10 @@ void waitImpl(Future& f) { // short-circuit if there's nothing to do if (f.isReady()) return; - folly::fibers::Baton baton; - f = f.then([&](Try t) { - baton.post(); - return makeFuture(std::move(t)); - }); + FutureBatonType baton; + f.setCallback_([&](const Try& /* t */) { baton.post(); }); baton.wait(); - - // There's a race here between the return here and the actual finishing of - // the future. f is completed, but the setup may not have finished on done - // after the baton has posted. - while (!f.isReady()) { - std::this_thread::yield(); - } + assert(f.isReady()); } template @@ -900,19 +968,16 @@ void waitImpl(Future& f, Duration dur) { // short-circuit if there's nothing to do if (f.isReady()) return; - auto baton = std::make_shared(); - f = f.then([baton](Try t) { + folly::MoveWrapper> promise; + auto ret = promise->getFuture(); + auto baton = std::make_shared(); + f.setCallback_([baton, promise](Try&& t) mutable { + promise->setTry(std::move(t)); baton->post(); - return makeFuture(std::move(t)); }); - - // Let's preserve the invariant that if we did not timeout (timed_wait returns - // true), then the returned Future is complete when it is returned to the - // caller. We need to wait out the race for that Future to complete. + f = std::move(ret); if (baton->timed_wait(dur)) { - while (!f.isReady()) { - std::this_thread::yield(); - } + assert(f.isReady()); } } @@ -966,11 +1031,6 @@ T Future::get() { return std::move(wait().value()); } -template <> -inline void Future::get() { - wait().value(); -} - template T Future::get(Duration dur) { wait(dur); @@ -981,31 +1041,25 @@ T Future::get(Duration dur) { } } -template <> -inline void Future::get(Duration dur) { - wait(dur); - if (isReady()) { - return; - } else { - throw TimedOut(); - } -} - template T Future::getVia(DrivableExecutor* e) { return std::move(waitVia(e).value()); } -template <> -inline void Future::getVia(DrivableExecutor* e) { - waitVia(e).value(); +namespace detail { + template + struct TryEquals { + static bool equals(const Try& t1, const Try& t2) { + return t1.value() == t2.value(); + } + }; } template Future Future::willEqual(Future& f) { return collectAll(*this, f).then([](const std::tuple, Try>& t) { if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) { - return std::get<0>(t).value() == std::get<1>(t).value(); + return detail::TryEquals::equals(std::get<0>(t), std::get<1>(t)); } else { return false; } @@ -1025,30 +1079,72 @@ Future Future::filter(F predicate) { }); } -namespace futures { - namespace { - template - Future chainHelper(Future f) { - return f; - } +template +template +auto Future::thenMulti(Callback&& fn) + -> decltype(this->then(std::forward(fn))) { + // thenMulti with one callback is just a then + return then(std::forward(fn)); +} - template - Future chainHelper(F f, Fn fn, Callbacks... fns) { - return chainHelper(f.then(fn), fns...); - } - } +template +template +auto Future::thenMulti(Callback&& fn, Callbacks&&... fns) + -> decltype(this->then(std::forward(fn)). + thenMulti(std::forward(fns)...)) { + // thenMulti with two callbacks is just then(a).thenMulti(b, ...) + return then(std::forward(fn)). + thenMulti(std::forward(fns)...); +} + +template +template +auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn, + Callbacks&&... fns) + -> decltype(this->then(std::forward(fn)). + thenMulti(std::forward(fns)...)) { + // thenMultiExecutor with two callbacks is + // via(x).then(a).thenMulti(b, ...).via(oldX) + auto oldX = getExecutor(); + setExecutor(x); + return then(std::forward(fn)). + thenMulti(std::forward(fns)...).via(oldX); +} + +template +template +auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn) + -> decltype(this->then(std::forward(fn))) { + // thenMulti with one callback is just a then with an executor + return then(x, std::forward(fn)); +} - template - std::function(Try)> - chain(Callbacks... fns) { - MoveWrapper> pw; - MoveWrapper> fw(chainHelper(pw->getFuture(), fns...)); - return [=](Try t) mutable { - pw->setTry(std::move(t)); - return std::move(*fw); - }; +template +inline Future when(bool p, F thunk) { + return p ? thunk().unit() : makeFuture(); +} + +template +Future whileDo(P predicate, F thunk) { + if (predicate()) { + return thunk().then([=] { + return whileDo(predicate, thunk); + }); } + return makeFuture(); +} + +template +Future times(const int n, F thunk) { + auto count = folly::makeMoveWrapper( + std::unique_ptr>(new std::atomic(0)) + ); + return folly::whileDo([=]() mutable { + return (*count)->fetch_add(1) < n; + }, thunk); +} +namespace futures { template std::vector> map(It first, It last, F func) { std::vector> results; @@ -1059,9 +1155,203 @@ namespace futures { } } -} // namespace folly +namespace futures { -// I haven't included a Future specialization because I don't forsee us -// using it, however it is not difficult to add when needed. Refer to -// Future for guidance. std::future and boost::future code would also be -// instructive. +namespace detail { + +struct retrying_policy_raw_tag {}; +struct retrying_policy_fut_tag {}; + +template +struct retrying_policy_traits { + using ew = exception_wrapper; + FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator()); + template + using has_op = typename std::integral_constant::value || + has_op_call::value>; + using is_raw = has_op; + using is_fut = has_op>; + using tag = typename std::conditional< + is_raw::value, retrying_policy_raw_tag, typename std::conditional< + is_fut::value, retrying_policy_fut_tag, void>::type>::type; +}; + +template +typename std::result_of::type +retrying(size_t k, Policy&& p, FF&& ff) { + using F = typename std::result_of::type; + using T = typename F::value_type; + auto f = ff(k++); + auto pm = makeMoveWrapper(p); + auto ffm = makeMoveWrapper(ff); + return f.onError([=](exception_wrapper x) mutable { + auto q = (*pm)(k, x); + auto xm = makeMoveWrapper(std::move(x)); + return q.then([=](bool r) mutable { + return r + ? retrying(k, pm.move(), ffm.move()) + : makeFuture(xm.move()); + }); + }); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { + auto pm = makeMoveWrapper(std::move(p)); + auto q = [=](size_t k, exception_wrapper x) { + return makeFuture((*pm)(k, x)); + }; + return retrying(0, std::move(q), std::forward(ff)); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { + return retrying(0, std::forward(p), std::forward(ff)); +} + +// jittered exponential backoff, clamped to [backoff_min, backoff_max] +template +Duration retryingJitteredExponentialBackoffDur( + size_t n, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG& rng) { + using d = Duration; + auto dist = std::normal_distribution(0.0, jitter_param); + auto jitter = std::exp(dist(rng)); + auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); + return std::max(backoff_min, std::min(backoff_max, backoff)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p) { + auto pm = makeMoveWrapper(std::move(p)); + auto rngp = std::make_shared(std::move(rng)); + return [=](size_t n, const exception_wrapper& ex) mutable { + if (n == max_tries) { return makeFuture(false); } + return (*pm)(n, ex).then([=](bool v) { + if (!v) { return makeFuture(false); } + auto backoff = detail::retryingJitteredExponentialBackoffDur( + n, backoff_min, backoff_max, jitter_param, *rngp); + return futures::sleep(backoff).then([] { return true; }); + }); + }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p, + retrying_policy_raw_tag) { + auto pm = makeMoveWrapper(std::move(p)); + auto q = [=](size_t n, const exception_wrapper& e) { + return makeFuture((*pm)(n, e)); + }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(q)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p, + retrying_policy_fut_tag) { + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(p)); +} + +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retrying(std::forward(p), std::forward(ff), tag()); +} + +inline +std::function +retryingPolicyBasic( + size_t max_tries) { + return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(p), + tag()); +} + +inline +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param) { + auto p = [](size_t, const exception_wrapper&) { return true; }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + ThreadLocalPRNG(), + std::move(p)); +} + +} + +// Instantiate the most common Future types to save compile time +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; + +} // namespace folly