X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Ffutures%2FFuture-inl.h;h=cf3e334437ca440899e7a468770e6b3494c1a09f;hp=135440a9e1ebfc49569c54bcb492c0884fb04833;hb=41365ea66d92749ba78f73d60325e5447beb04ab;hpb=b79405c612523670e6b4238c8ff85681734b7075 diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 135440a9..cf3e3344 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,20 +16,39 @@ #pragma once +#include +#include #include +#include #include - -#include +#include #include +#include +#include #include #include +#if FOLLY_MOBILE || defined(__APPLE__) +#define FOLLY_FUTURE_USING_FIBER 0 +#else +#define FOLLY_FUTURE_USING_FIBER 1 +#include +#endif + namespace folly { class Timekeeper; namespace detail { - Timekeeper* getTimekeeperSingleton(); +#if FOLLY_FUTURE_USING_FIBER +typedef folly::fibers::Baton FutureBatonType; +#else +typedef folly::Baton<> FutureBatonType; +#endif +} + +namespace detail { + std::shared_ptr getTimekeeperSingleton(); } template @@ -44,22 +63,14 @@ Future& Future::operator=(Future&& other) noexcept { } template -template -Future::Future(T2&& val) : core_(nullptr) { - Promise p; - p.setValue(std::forward(val)); - *this = p.getFuture(); -} - -template <> -template ::value, int>::type> -Future::Future() : core_(nullptr) { - Promise p; - p.setValue(); - *this = p.getFuture(); -} +template +Future::Future(T2&& val) + : core_(new detail::Core(Try(std::forward(val)))) {} +template +template +Future::Future(typename std::enable_if::value>::type*) + : core_(new detail::Core(Try(T()))) {} template Future::~Future() { @@ -84,7 +95,7 @@ template template void Future::setCallback_(F&& func) { throwIfInvalid(); - core_->setCallback(std::move(func)); + core_->setCallback(std::forward(func)); } // unwrap @@ -106,21 +117,18 @@ Future::unwrap() { template template typename std::enable_if::type -Future::thenImplementation(F func, detail::argResult) { +Future::thenImplementation(F&& func, detail::argResult) { static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument"); typedef typename R::ReturnsFuture::Inner B; throwIfInvalid(); - // wrap these so we can move them into the lambda - folly::MoveWrapper> p; - folly::MoveWrapper funcm(std::forward(func)); + Promise p; + p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); // grab the Future now before we lose our handle on the Promise - auto f = p->getFuture(); - if (getExecutor()) { - f.setExecutor(getExecutor()); - } + auto f = p.getFuture(); + f.core_->setExecutorNoLock(getExecutor()); /* This is a bit tricky. @@ -141,9 +149,6 @@ Future::thenImplementation(F func, detail::argResult) { persist beyond the callback, if it gets moved), and so it is an optimization to just make it shared from the get-go. - We have to move in the Promise and func using the MoveWrapper - hack. (func could be copied but it's a big drag on perf). - Two subtle but important points about this design. detail::Core has no back pointers to Future or Promise, so if Future or Promise get moved (and they will be moved in performant code) we don't have to do @@ -154,16 +159,14 @@ Future::thenImplementation(F func, detail::argResult) { in some circumstances, but I think it should be explicit not implicit in the destruction of the Future used to create it. */ - setCallback_( - [p, funcm](Try&& t) mutable { - if (!isTry && t.hasException()) { - p->setException(std::move(t.exception())); - } else { - p->setWith([&]() { - return (*funcm)(t.template get()...); - }); - } - }); + setCallback_([ funcm = std::forward(func), pm = std::move(p) ]( + Try && t) mutable { + if (!isTry && t.hasException()) { + pm.setException(std::move(t.exception())); + } else { + pm.setWith([&]() { return funcm(t.template get()...); }); + } + }); return f; } @@ -173,40 +176,43 @@ Future::thenImplementation(F func, detail::argResult) { template template typename std::enable_if::type -Future::thenImplementation(F func, detail::argResult) { +Future::thenImplementation(F&& func, detail::argResult) { static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument"); typedef typename R::ReturnsFuture::Inner B; throwIfInvalid(); - // wrap these so we can move them into the lambda - folly::MoveWrapper> p; - folly::MoveWrapper funcm(std::forward(func)); + Promise p; + p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); // grab the Future now before we lose our handle on the Promise - auto f = p->getFuture(); - if (getExecutor()) { - f.setExecutor(getExecutor()); - } + auto f = p.getFuture(); + f.core_->setExecutorNoLock(getExecutor()); - setCallback_( - [p, funcm](Try&& t) mutable { + setCallback_([ funcm = std::forward(func), pm = std::move(p) ]( + Try && t) mutable { + auto ew = [&] { if (!isTry && t.hasException()) { - p->setException(std::move(t.exception())); + return std::move(t.exception()); } else { try { - auto f2 = (*funcm)(t.template get()...); + auto f2 = funcm(t.template get()...); // that didn't throw, now we can steal p - f2.setCallback_([p](Try&& b) mutable { - p->setTry(std::move(b)); + f2.setCallback_([p = std::move(pm)](Try && b) mutable { + p.setTry(std::move(b)); }); + return exception_wrapper(); } catch (const std::exception& e) { - p->setException(exception_wrapper(std::current_exception(), e)); + return exception_wrapper(std::current_exception(), e); } catch (...) { - p->setException(exception_wrapper(std::current_exception())); + return exception_wrapper(std::current_exception()); } } - }); + }(); + if (ew) { + pm.setException(std::move(ew)); + } + }); return f; } @@ -223,22 +229,21 @@ Future::then(R(Caller::*func)(Args...), Caller *instance) { }); } -// TODO(6838553) -#ifndef __clang__ template -template -auto Future::then(Executor* x, Args&&... args) - -> decltype(this->then(std::forward(args)...)) +template +auto Future::then(Executor* x, Arg&& arg, Args&&... args) + -> decltype(this->then(std::forward(arg), + std::forward(args)...)) { auto oldX = getExecutor(); setExecutor(x); - return this->then(std::forward(args)...).via(oldX); + return this->then(std::forward(arg), std::forward(args)...). + via(oldX); } -#endif template -Future Future::then() { - return then([] (Try&& t) {}); +Future Future::then() { + return then([] () {}); } // onError where the callback returns T @@ -255,16 +260,14 @@ Future::onError(F&& func) { "Return type of onError callback must be T or Future"); Promise p; + p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); auto f = p.getFuture(); - auto pm = folly::makeMoveWrapper(std::move(p)); - auto funcm = folly::makeMoveWrapper(std::move(func)); - setCallback_([pm, funcm](Try&& t) mutable { - if (!t.template withException([&] (Exn& e) { - pm->setWith([&]{ - return (*funcm)(e); - }); - })) { - pm->setTry(std::move(t)); + + setCallback_([ funcm = std::forward(func), pm = std::move(p) ]( + Try && t) mutable { + if (!t.template withException( + [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) { + pm.setTry(std::move(t)); } }); @@ -286,22 +289,28 @@ Future::onError(F&& func) { Promise p; auto f = p.getFuture(); - auto pm = folly::makeMoveWrapper(std::move(p)); - auto funcm = folly::makeMoveWrapper(std::move(func)); - setCallback_([pm, funcm](Try&& t) mutable { - if (!t.template withException([&] (Exn& e) { - try { - auto f2 = (*funcm)(e); - f2.setCallback_([pm](Try&& t2) mutable { - pm->setTry(std::move(t2)); - }); - } catch (const std::exception& e2) { - pm->setException(exception_wrapper(std::current_exception(), e2)); - } catch (...) { - pm->setException(exception_wrapper(std::current_exception())); + + setCallback_([ pm = std::move(p), funcm = std::forward(func) ]( + Try && t) mutable { + if (!t.template withException([&](Exn& e) { + auto ew = [&] { + try { + auto f2 = funcm(e); + f2.setCallback_([pm = std::move(pm)](Try && t2) mutable { + pm.setTry(std::move(t2)); + }); + return exception_wrapper(); + } catch (const std::exception& e2) { + return exception_wrapper(std::current_exception(), e2); + } catch (...) { + return exception_wrapper(std::current_exception()); + } + }(); + if (ew) { + pm.setException(std::move(ew)); } })) { - pm->setTry(std::move(t)); + pm.setTry(std::move(t)); } }); @@ -310,10 +319,9 @@ Future::onError(F&& func) { template template -Future Future::ensure(F func) { - MoveWrapper funcw(std::move(func)); - return this->then([funcw](Try&& t) { - (*funcw)(); +Future Future::ensure(F&& func) { + return this->then([funcw = std::forward(func)](Try && t) mutable { + funcw(); return makeFuture(std::move(t)); }); } @@ -321,17 +329,15 @@ Future Future::ensure(F func) { template template Future Future::onTimeout(Duration dur, F&& func, Timekeeper* tk) { - auto funcw = folly::makeMoveWrapper(std::forward(func)); - return within(dur, tk) - .onError([funcw](TimedOut const&) { return (*funcw)(); }); + return within(dur, tk).onError([funcw = std::forward(func)]( + TimedOut const&) { return funcw(); }); } template template -typename std::enable_if< - detail::callableWith::value && - detail::Extract::ReturnsFuture::value, - Future>::type +typename std::enable_if::value && + detail::Extract::ReturnsFuture::value, + Future>::type Future::onError(F&& func) { static_assert( std::is_same::Return, Future>::value, @@ -339,24 +345,29 @@ Future::onError(F&& func) { Promise p; auto f = p.getFuture(); - auto pm = folly::makeMoveWrapper(std::move(p)); - auto funcm = folly::makeMoveWrapper(std::move(func)); - setCallback_([pm, funcm](Try t) mutable { - if (t.hasException()) { - try { - auto f2 = (*funcm)(std::move(t.exception())); - f2.setCallback_([pm](Try t2) mutable { - pm->setTry(std::move(t2)); - }); - } catch (const std::exception& e2) { - pm->setException(exception_wrapper(std::current_exception(), e2)); - } catch (...) { - pm->setException(exception_wrapper(std::current_exception())); - } - } else { - pm->setTry(std::move(t)); - } - }); + setCallback_( + [ pm = std::move(p), funcm = std::forward(func) ](Try t) mutable { + if (t.hasException()) { + auto ew = [&] { + try { + auto f2 = funcm(std::move(t.exception())); + f2.setCallback_([pm = std::move(pm)](Try t2) mutable { + pm.setTry(std::move(t2)); + }); + return exception_wrapper(); + } catch (const std::exception& e2) { + return exception_wrapper(std::current_exception(), e2); + } catch (...) { + return exception_wrapper(std::current_exception()); + } + }(); + if (ew) { + pm.setException(std::move(ew)); + } + } else { + pm.setTry(std::move(t)); + } + }); return f; } @@ -375,17 +386,14 @@ Future::onError(F&& func) { Promise p; auto f = p.getFuture(); - auto pm = folly::makeMoveWrapper(std::move(p)); - auto funcm = folly::makeMoveWrapper(std::move(func)); - setCallback_([pm, funcm](Try t) mutable { - if (t.hasException()) { - pm->setWith([&]{ - return (*funcm)(std::move(t.exception())); + setCallback_( + [ pm = std::move(p), funcm = std::forward(func) ](Try t) mutable { + if (t.hasException()) { + pm.setWith([&] { return funcm(std::move(t.exception())); }); + } else { + pm.setTry(std::move(t)); + } }); - } else { - pm->setTry(std::move(t)); - } - }); return f; } @@ -411,6 +419,11 @@ Try& Future::getTry() { return core_->getTry(); } +template +Try& Future::getTryVia(DrivableExecutor* e) { + return waitVia(e).getTry(); +} + template Optional> Future::poll() { Optional> o; @@ -421,24 +434,30 @@ Optional> Future::poll() { } template -template -inline Future Future::via(Executor* executor) && { +inline Future Future::via(Executor* executor, int8_t priority) && { throwIfInvalid(); - setExecutor(executor); + setExecutor(executor, priority); return std::move(*this); } template -template -inline Future Future::via(Executor* executor) & { +inline Future Future::via(Executor* executor, int8_t priority) & { throwIfInvalid(); - MoveWrapper> p; - auto f = p->getFuture(); - then([p](Try&& t) mutable { p->setTry(std::move(t)); }); - return std::move(f).via(executor); + Promise p; + auto f = p.getFuture(); + then([p = std::move(p)](Try && t) mutable { p.setTry(std::move(t)); }); + return std::move(f).via(executor, priority); +} + +template +auto via(Executor* x, Func&& func) + -> Future::Inner> +{ + // TODO make this actually more performant. :-P #7260175 + return via(x).then(std::forward(func)); } template @@ -447,6 +466,16 @@ bool Future::isReady() const { return core_->ready(); } +template +bool Future::hasValue() { + return getTry().hasValue(); +} + +template +bool Future::hasException() { + return getTry().hasException(); +} + template void Future::raise(exception_wrapper exception) { core_->raise(std::move(exception)); @@ -456,98 +485,97 @@ void Future::raise(exception_wrapper exception) { template Future::type> makeFuture(T&& t) { - Promise::type> p; - p.setValue(std::forward(t)); - return p.getFuture(); + return makeFuture(Try::type>(std::forward(t))); } inline // for multiple translation units -Future makeFuture() { - Promise p; - p.setValue(); - return p.getFuture(); +Future makeFuture() { + return makeFuture(Unit{}); } +// makeFutureWith(Future()) -> Future template -auto makeFutureWith( - F&& func, - typename std::enable_if::value, bool>::type sdf) - -> Future { - Promise p; - p.setWith( - [&func]() { - return (func)(); - }); - return p.getFuture(); +typename std::enable_if::type>::value, + typename std::result_of::type>::type +makeFutureWith(F&& func) { + using InnerType = + typename isFuture::type>::Inner; + try { + return func(); + } catch (std::exception& e) { + return makeFuture( + exception_wrapper(std::current_exception(), e)); + } catch (...) { + return makeFuture(exception_wrapper(std::current_exception())); + } } +// makeFutureWith(T()) -> Future +// makeFutureWith(void()) -> Future template -auto makeFutureWith(F const& func) -> Future { - F copy = func; - return makeFutureWith(std::move(copy)); +typename std::enable_if< + !(isFuture::type>::value), + Future::type>::type>>::type +makeFutureWith(F&& func) { + using LiftedResult = + typename Unit::Lift::type>::type; + return makeFuture(makeTryWith([&func]() mutable { + return func(); + })); } template Future makeFuture(std::exception_ptr const& e) { - Promise p; - p.setException(e); - return p.getFuture(); + return makeFuture(Try(e)); } template Future makeFuture(exception_wrapper ew) { - Promise p; - p.setException(std::move(ew)); - return p.getFuture(); + return makeFuture(Try(std::move(ew))); } template typename std::enable_if::value, Future>::type makeFuture(E const& e) { - Promise p; - p.setException(make_exception_wrapper(e)); - return p.getFuture(); + return makeFuture(Try(make_exception_wrapper(e))); } template Future makeFuture(Try&& t) { - Promise::type> p; - p.setTry(std::move(t)); - return p.getFuture(); + return Future(new detail::Core(std::move(t))); } -template <> -inline Future makeFuture(Try&& t) { - if (t.hasException()) { - return makeFuture(std::move(t.exception())); - } else { - return makeFuture(); - } +// via +Future via(Executor* executor, int8_t priority) { + return makeFuture().via(executor, priority); } -// via -template -Future via(Executor* executor) { - return makeFuture().via(executor); +// mapSetCallback calls func(i, Try) when every future completes + +template +void mapSetCallback(InputIterator first, InputIterator last, F func) { + for (size_t i = 0; first != last; ++first, ++i) { + first->setCallback_([func, i](Try&& t) { + func(i, std::move(t)); + }); + } } -// when (variadic) +// collectAll (variadic) template -typename detail::VariadicContext< +typename detail::CollectAllVariadicContext< typename std::decay::type::value_type...>::type collectAll(Fs&&... fs) { - auto ctx = - new detail::VariadicContext::type::value_type...>(); - ctx->total = sizeof...(fs); - auto f_saved = ctx->p.getFuture(); - detail::collectAllVariadicHelper(ctx, - std::forward::type>(fs)...); - return f_saved; + auto ctx = std::make_shared::type::value_type...>>(); + detail::collectVariadicHelper( + ctx, std::forward::type>(fs)...); + return ctx->p.getFuture(); } -// when (iterator) +// collectAll (iterator) template Future< @@ -557,184 +585,159 @@ collectAll(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - if (first >= last) { - return makeFuture(std::vector>()); - } - size_t n = std::distance(first, last); - - auto ctx = new detail::WhenAllContext(); - - ctx->results.resize(n); - - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; ++first, ++i) { - assert(i < n); - auto& f = *first; - f.setCallback_([ctx, i, n](Try t) { - ctx->results[i] = std::move(t); - if (++ctx->count == n) { - ctx->p.setValue(std::move(ctx->results)); - delete ctx; - } - }); - } + struct CollectAllContext { + CollectAllContext(size_t n) : results(n) {} + ~CollectAllContext() { + p.setValue(std::move(results)); + } + Promise>> p; + std::vector> results; + }; - return f_saved; + auto ctx = + std::make_shared(size_t(std::distance(first, last))); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + ctx->results[i] = std::move(t); + }); + return ctx->p.getFuture(); } -namespace detail { +// collect (iterator) -template struct CollectContextHelper; - -template -struct CollectContextHelper::value>::type> { - static inline std::vector&& getResults(std::vector& results) { - return std::move(results); - } -}; - -template -struct CollectContextHelper::value>::type> { - static inline std::vector getResults(std::vector& results) { - std::vector finalResults; - finalResults.reserve(results.size()); - for (auto& opt : results) { - finalResults.push_back(std::move(opt.value())); - } - return finalResults; - } -}; +namespace detail { template struct CollectContext { + struct Nothing { + explicit Nothing(int /* n */) {} + }; - typedef typename std::conditional< - std::is_default_constructible::value, - T, - Optional - >::type VecT; - - explicit CollectContext(int n) : count(0), threw(false) { - results.resize(n); - } - - Promise> p; - std::vector results; - std::atomic count; - std::atomic_bool threw; - - typedef std::vector result_type; - - static inline Future> makeEmptyFuture() { - return makeFuture(std::vector()); - } - - inline void setValue() { - p.setValue(CollectContextHelper::getResults(results)); - } - - inline void addResult(int i, Try& t) { - results[i] = std::move(t.value()); - } -}; - -template <> -struct CollectContext { - - explicit CollectContext(int n) : count(0), threw(false) {} - - Promise p; - std::atomic count; - std::atomic_bool threw; - - typedef void result_type; - - static inline Future makeEmptyFuture() { - return makeFuture(); - } - - inline void setValue() { - p.setValue(); + using Result = typename std::conditional< + std::is_void::value, + void, + std::vector>::type; + + using InternalResult = typename std::conditional< + std::is_void::value, + Nothing, + std::vector>>::type; + + explicit CollectContext(size_t n) : result(n) {} + ~CollectContext() { + if (!threw.exchange(true)) { + // map Optional -> T + std::vector finalResult; + finalResult.reserve(result.size()); + std::transform(result.begin(), result.end(), + std::back_inserter(finalResult), + [](Optional& o) { return std::move(o.value()); }); + p.setValue(std::move(finalResult)); + } } - - inline void addResult(int i, Try& t) { - // do nothing + inline void setPartialResult(size_t i, Try& t) { + result[i] = std::move(t.value()); } + Promise p; + InternalResult result; + std::atomic threw {false}; }; -} // detail +} template Future::value_type::value_type ->::result_type> + typename std::iterator_traits::value_type::value_type>::Result> collect(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - if (first >= last) { - return detail::CollectContext::makeEmptyFuture(); - } - - size_t n = std::distance(first, last); - auto ctx = new detail::CollectContext(n); - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; ++first, ++i) { - assert(i < n); - auto& f = *first; - f.setCallback_([ctx, i, n](Try t) { - auto c = ++ctx->count; - - if (t.hasException()) { - if (!ctx->threw.exchange(true)) { - ctx->p.setException(std::move(t.exception())); - } - } else if (!ctx->threw) { - ctx->addResult(i, t); - if (c == n) { - ctx->setValue(); - } + auto ctx = std::make_shared>( + std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (t.hasException()) { + if (!ctx->threw.exchange(true)) { + ctx->p.setException(std::move(t.exception())); } + } else if (!ctx->threw) { + ctx->setPartialResult(i, t); + } + }); + return ctx->p.getFuture(); +} - if (c == n) { - delete ctx; - } - }); - } +// collect (variadic) - return f_saved; +template +typename detail::CollectVariadicContext< + typename std::decay::type::value_type...>::type +collect(Fs&&... fs) { + auto ctx = std::make_shared::type::value_type...>>(); + detail::collectVariadicHelper( + ctx, std::forward::type>(fs)...); + return ctx->p.getFuture(); } +// collectAny (iterator) + template Future< std::pair::value_type::value_type> > > + std::iterator_traits::value_type::value_type>>> collectAny(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - auto ctx = new detail::WhenAnyContext(std::distance(first, last)); - auto f_saved = ctx->p.getFuture(); + struct CollectAnyContext { + CollectAnyContext() {} + Promise>> p; + std::atomic done {false}; + }; - for (size_t i = 0; first != last; first++, i++) { - auto& f = *first; - f.setCallback_([i, ctx](Try&& t) { - if (!ctx->done.exchange(true)) { - ctx->p.setValue(std::make_pair(i, std::move(t))); - } - ctx->decref(); - }); - } + auto ctx = std::make_shared(); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (!ctx->done.exchange(true)) { + ctx->p.setValue(std::make_pair(i, std::move(t))); + } + }); + return ctx->p.getFuture(); +} + +// collectAnyWithoutException (iterator) - return f_saved; +template +Future::value_type::value_type>> +collectAnyWithoutException(InputIterator first, InputIterator last) { + typedef + typename std::iterator_traits::value_type::value_type T; + + struct CollectAnyWithoutExceptionContext { + CollectAnyWithoutExceptionContext(){} + Promise> p; + std::atomic done{false}; + std::atomic nFulfilled{0}; + size_t nTotal; + }; + + auto ctx = std::make_shared(); + ctx->nTotal = size_t(std::distance(first, last)); + + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (!t.hasException() && !ctx->done.exchange(true)) { + ctx->p.setValue(std::make_pair(i, std::move(t.value()))); + } else if (++ctx->nFulfilled == ctx->nTotal) { + ctx->p.setException(t.exception()); + } + }); + return ctx->p.getFuture(); } +// collectN (iterator) + template Future::value_type::value_type>>>> @@ -743,90 +746,190 @@ collectN(InputIterator first, InputIterator last, size_t n) { std::iterator_traits::value_type::value_type T; typedef std::vector>> V; - struct ctx_t { + struct CollectNContext { V v; - size_t completed; + std::atomic completed = {0}; Promise p; }; - auto ctx = std::make_shared(); - ctx->completed = 0; - - // for each completed Future, increase count and add to vector, until we - // have n completed futures at which point we fulfill our Promise with the - // vector - auto it = first; - size_t i = 0; - while (it != last) { - it->then([ctx, n, i](Try&& t) { - auto& v = ctx->v; + auto ctx = std::make_shared(); + + if (size_t(std::distance(first, last)) < n) { + ctx->p.setException(std::runtime_error("Not enough futures")); + } else { + // for each completed Future, increase count and add to vector, until we + // have n completed futures at which point we fulfil our Promise with the + // vector + mapSetCallback(first, last, [ctx, n](size_t i, Try&& t) { auto c = ++ctx->completed; if (c <= n) { assert(ctx->v.size() < n); - v.push_back(std::make_pair(i, std::move(t))); + ctx->v.emplace_back(i, std::move(t)); if (c == n) { - ctx->p.setTry(Try(std::move(v))); + ctx->p.setTry(Try(std::move(ctx->v))); } } }); - - it++; - i++; - } - - if (i < n) { - ctx->p.setException(std::runtime_error("Not enough futures")); } return ctx->p.getFuture(); } -template -typename std::enable_if::value, Future>::type -reduce(It first, It last, T initial, F func) { +// reduce (iterator) + +template +Future reduce(It first, It last, T&& initial, F&& func) { if (first == last) { return makeFuture(std::move(initial)); } + typedef typename std::iterator_traits::value_type::value_type ItT; + typedef + typename std::conditional&&>::value, + Try, + ItT>::type Arg; typedef isTry IsTry; - return collectAll(first, last) - .then([initial, func](std::vector>& vals) mutable { - for (auto& val : vals) { - initial = func(std::move(initial), - // Either return a ItT&& or a Try&& depending - // on the type of the argument of func. - val.template get()); - } - return initial; + auto sfunc = std::make_shared(std::move(func)); + + auto f = first->then( + [ minitial = std::move(initial), sfunc ](Try & head) mutable { + return (*sfunc)( + std::move(minitial), head.template get()); + }); + + for (++first; first != last; ++first) { + f = collectAll(f, *first).then([sfunc](std::tuple, Try>& t) { + return (*sfunc)(std::move(std::get<0>(t).value()), + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + std::get<1>(t).template get()); }); + } + + return f; +} + +// window (collection) + +template +std::vector> +window(Collection input, F func, size_t n) { + struct WindowContext { + WindowContext(Collection&& i, F&& fn) + : input_(std::move(i)), promises_(input_.size()), + func_(std::move(fn)) + {} + std::atomic i_ {0}; + Collection input_; + std::vector> promises_; + F func_; + + static inline void spawn(const std::shared_ptr& ctx) { + size_t i = ctx->i_++; + if (i < ctx->input_.size()) { + // Using setCallback_ directly since we don't need the Future + ctx->func_(std::move(ctx->input_[i])).setCallback_( + // ctx is captured by value + [ctx, i](Try&& t) { + ctx->promises_[i].setTry(std::move(t)); + // Chain another future onto this one + spawn(std::move(ctx)); + }); + } + } + }; + + auto max = std::min(n, input.size()); + + auto ctx = std::make_shared( + std::move(input), std::move(func)); + + for (size_t i = 0; i < max; ++i) { + // Start the first n Futures + WindowContext::spawn(ctx); + } + + std::vector> futures; + futures.reserve(ctx->promises_.size()); + for (auto& promise : ctx->promises_) { + futures.emplace_back(promise.getFuture()); + } + + return futures; } +// reduce + +template +template +Future Future::reduce(I&& initial, F&& func) { + return then([ + minitial = std::forward(initial), + mfunc = std::forward(func) + ](T& vals) mutable { + auto ret = std::move(minitial); + for (auto& val : vals) { + ret = mfunc(std::move(ret), std::move(val)); + } + return ret; + }); +} + +// unorderedReduce (iterator) + template -typename std::enable_if::value, Future>::type -reduce(It first, It last, T initial, F func) { +Future unorderedReduce(It first, It last, T initial, F func) { if (first == last) { return makeFuture(std::move(initial)); } typedef isTry IsTry; - auto f = first->then([initial, func](Try& head) mutable { - return func(std::move(initial), - head.template get()); - }); + struct UnorderedReduceContext { + UnorderedReduceContext(T&& memo, F&& fn, size_t n) + : lock_(), memo_(makeFuture(std::move(memo))), + func_(std::move(fn)), numThens_(0), numFutures_(n), promise_() + {} + folly::MicroSpinLock lock_; // protects memo_ and numThens_ + Future memo_; + F func_; + size_t numThens_; // how many Futures completed and called .then() + size_t numFutures_; // how many Futures in total + Promise promise_; + }; - for (++first; first != last; ++first) { - f = collectAll(f, *first).then([func](std::tuple, Try>& t) { - return func(std::move(std::get<0>(t).value()), - // Either return a ItT&& or a Try&& depending - // on the type of the argument of func. - std::get<1>(t).template get()); - }); - } + auto ctx = std::make_shared( + std::move(initial), std::move(func), std::distance(first, last)); + + mapSetCallback( + first, + last, + [ctx](size_t /* i */, Try&& t) { + // Futures can be completed in any order, simultaneously. + // To make this non-blocking, we create a new Future chain in + // the order of completion to reduce the values. + // The spinlock just protects chaining a new Future, not actually + // executing the reduce, which should be really fast. + folly::MSLGuard lock(ctx->lock_); + ctx->memo_ = + ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable { + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + return ctx->func_(std::move(v), + mt.template get()); + }); + if (++ctx->numThens_ == ctx->numFutures_) { + // After reducing the value of the last Future, fulfill the Promise + ctx->memo_.setCallback_( + [ctx](Try&& t2) { ctx->promise_.setValue(std::move(t2)); }); + } + }); - return f; + return ctx->promise_.getFuture(); } +// within + template Future Future::within(Duration dur, Timekeeper* tk) { return within(dur, TimedOut(), tk); @@ -837,41 +940,49 @@ template Future Future::within(Duration dur, E e, Timekeeper* tk) { struct Context { - Context(E ex) : exception(std::move(ex)), promise(), token(false) {} + Context(E ex) : exception(std::move(ex)), promise() {} E exception; + Future thisFuture; Promise promise; - std::atomic token; + std::atomic token {false}; }; - auto ctx = std::make_shared(std::move(e)); + std::shared_ptr tks; if (!tk) { - tk = folly::detail::getTimekeeperSingleton(); + tks = folly::detail::getTimekeeperSingleton(); + tk = DCHECK_NOTNULL(tks.get()); } - tk->after(dur) - .then([ctx](Try const& t) { - if (ctx->token.exchange(true) == false) { - if (t.hasException()) { - ctx->promise.setException(std::move(t.exception())); - } else { - ctx->promise.setException(std::move(ctx->exception)); - } - } - }); + auto ctx = std::make_shared(std::move(e)); - this->then([ctx](Try&& t) { + ctx->thisFuture = this->then([ctx](Try&& t) mutable { + // TODO: "this" completed first, cancel "after" if (ctx->token.exchange(true) == false) { ctx->promise.setTry(std::move(t)); } }); - return ctx->promise.getFuture(); + tk->after(dur).then([ctx](Try const& t) mutable { + // "after" completed first, cancel "this" + ctx->thisFuture.raise(TimedOut()); + if (ctx->token.exchange(true) == false) { + if (t.hasException()) { + ctx->promise.setException(std::move(t.exception())); + } else { + ctx->promise.setException(std::move(ctx->exception)); + } + } + }); + + return ctx->promise.getFuture().via(getExecutor()); } +// delayed + template Future Future::delayed(Duration dur, Timekeeper* tk) { return collectAll(*this, futures::sleep(dur, tk)) - .then([](std::tuple, Try> tup) { + .then([](std::tuple, Try> tup) { Try& t = std::get<0>(tup); return makeFuture(std::move(t)); }); @@ -884,47 +995,44 @@ void waitImpl(Future& f) { // short-circuit if there's nothing to do if (f.isReady()) return; - folly::fibers::Baton baton; - f = f.then([&](Try t) { - baton.post(); - return makeFuture(std::move(t)); - }); + FutureBatonType baton; + f.setCallback_([&](const Try& /* t */) { baton.post(); }); baton.wait(); - - // There's a race here between the return here and the actual finishing of - // the future. f is completed, but the setup may not have finished on done - // after the baton has posted. - while (!f.isReady()) { - std::this_thread::yield(); - } + assert(f.isReady()); } template void waitImpl(Future& f, Duration dur) { // short-circuit if there's nothing to do - if (f.isReady()) return; + if (f.isReady()) { + return; + } - auto baton = std::make_shared(); - f = f.then([baton](Try t) { + Promise promise; + auto ret = promise.getFuture(); + auto baton = std::make_shared(); + f.setCallback_([ baton, promise = std::move(promise) ](Try && t) mutable { + promise.setTry(std::move(t)); baton->post(); - return makeFuture(std::move(t)); }); - - // Let's preserve the invariant that if we did not timeout (timed_wait returns - // true), then the returned Future is complete when it is returned to the - // caller. We need to wait out the race for that Future to complete. + f = std::move(ret); if (baton->timed_wait(dur)) { - while (!f.isReady()) { - std::this_thread::yield(); - } + assert(f.isReady()); } } template void waitViaImpl(Future& f, DrivableExecutor* e) { + // Set callback so to ensure that the via executor has something on it + // so that once the preceding future triggers this callback, drive will + // always have a callback to satisfy it + if (f.isReady()) + return; + f = f.via(e).then([](T&& t) { return std::move(t); }); while (!f.isReady()) { e->drive(); } + assert(f.isReady()); } } // detail @@ -970,11 +1078,6 @@ T Future::get() { return std::move(wait().value()); } -template <> -inline void Future::get() { - wait().value(); -} - template T Future::get(Duration dur) { wait(dur); @@ -985,31 +1088,25 @@ T Future::get(Duration dur) { } } -template <> -inline void Future::get(Duration dur) { - wait(dur); - if (isReady()) { - return; - } else { - throw TimedOut(); - } -} - template T Future::getVia(DrivableExecutor* e) { return std::move(waitVia(e).value()); } -template <> -inline void Future::getVia(DrivableExecutor* e) { - waitVia(e).value(); +namespace detail { + template + struct TryEquals { + static bool equals(const Try& t1, const Try& t2) { + return t1.value() == t2.value(); + } + }; } template Future Future::willEqual(Future& f) { return collectAll(*this, f).then([](const std::tuple, Try>& t) { if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) { - return std::get<0>(t).value() == std::get<1>(t).value(); + return detail::TryEquals::equals(std::get<0>(t), std::get<1>(t)); } else { return false; } @@ -1018,41 +1115,85 @@ Future Future::willEqual(Future& f) { template template -Future Future::filter(F predicate) { - auto p = folly::makeMoveWrapper(std::move(predicate)); - return this->then([p](T val) { +Future Future::filter(F&& predicate) { + return this->then([p = std::forward(predicate)](T val) { T const& valConstRef = val; - if (!(*p)(valConstRef)) { + if (!p(valConstRef)) { throw PredicateDoesNotObtain(); } return val; }); } -namespace futures { - namespace { - template - Future chainHelper(Future f) { - return f; - } +template +template +auto Future::thenMulti(Callback&& fn) + -> decltype(this->then(std::forward(fn))) { + // thenMulti with one callback is just a then + return then(std::forward(fn)); +} - template - Future chainHelper(F f, Fn fn, Callbacks... fns) { - return chainHelper(f.then(fn), fns...); - } - } +template +template +auto Future::thenMulti(Callback&& fn, Callbacks&&... fns) + -> decltype(this->then(std::forward(fn)). + thenMulti(std::forward(fns)...)) { + // thenMulti with two callbacks is just then(a).thenMulti(b, ...) + return then(std::forward(fn)). + thenMulti(std::forward(fns)...); +} + +template +template +auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn, + Callbacks&&... fns) + -> decltype(this->then(std::forward(fn)). + thenMulti(std::forward(fns)...)) { + // thenMultiExecutor with two callbacks is + // via(x).then(a).thenMulti(b, ...).via(oldX) + auto oldX = getExecutor(); + setExecutor(x); + return then(std::forward(fn)). + thenMulti(std::forward(fns)...).via(oldX); +} - template - std::function(Try)> - chain(Callbacks... fns) { - MoveWrapper> pw; - MoveWrapper> fw(chainHelper(pw->getFuture(), fns...)); - return [=](Try t) mutable { - pw->setTry(std::move(t)); - return std::move(*fw); - }; +template +template +auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn) + -> decltype(this->then(std::forward(fn))) { + // thenMulti with one callback is just a then with an executor + return then(x, std::forward(fn)); +} + +template +inline Future when(bool p, F&& thunk) { + return p ? std::forward(thunk)().unit() : makeFuture(); +} + +template +Future whileDo(P&& predicate, F&& thunk) { + if (predicate()) { + auto future = thunk(); + return future.then([ + predicate = std::forward

(predicate), + thunk = std::forward(thunk) + ]() mutable { + return whileDo(std::forward

(predicate), std::forward(thunk)); + }); } + return makeFuture(); +} +template +Future times(const int n, F&& thunk) { + return folly::whileDo( + [ n, count = folly::make_unique>(0) ]() mutable { + return count->fetch_add(1) < n; + }, + std::forward(thunk)); +} + +namespace futures { template std::vector> map(It first, It last, F func) { std::vector> results; @@ -1063,9 +1204,212 @@ namespace futures { } } -} // namespace folly +namespace futures { + +namespace detail { + +struct retrying_policy_raw_tag {}; +struct retrying_policy_fut_tag {}; + +template +struct retrying_policy_traits { + using ew = exception_wrapper; + FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator()); + template + using has_op = typename std::integral_constant::value || + has_op_call::value>; + using is_raw = has_op; + using is_fut = has_op>; + using tag = typename std::conditional< + is_raw::value, retrying_policy_raw_tag, typename std::conditional< + is_fut::value, retrying_policy_fut_tag, void>::type>::type; +}; -// I haven't included a Future specialization because I don't forsee us -// using it, however it is not difficult to add when needed. Refer to -// Future for guidance. std::future and boost::future code would also be -// instructive. +template +typename std::result_of::type +retrying(size_t k, Policy&& p, FF&& ff) { + using F = typename std::result_of::type; + using T = typename F::value_type; + auto f = ff(k++); + return f.onError( + [ k, pm = std::forward(p), ffm = std::forward(ff) ]( + exception_wrapper x) mutable { + auto q = pm(k, x); + return q.then( + [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ]( + bool r) mutable { + return r ? retrying(k, std::move(pm), std::move(ffm)) + : makeFuture(std::move(xm)); + }); + }); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { + auto q = [pm = std::forward(p)](size_t k, exception_wrapper x) { + return makeFuture(pm(k, x)); + }; + return retrying(0, std::move(q), std::forward(ff)); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { + return retrying(0, std::forward(p), std::forward(ff)); +} + +// jittered exponential backoff, clamped to [backoff_min, backoff_max] +template +Duration retryingJitteredExponentialBackoffDur( + size_t n, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG& rng) { + using d = Duration; + auto dist = std::normal_distribution(0.0, jitter_param); + auto jitter = std::exp(dist(rng)); + auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); + return std::max(backoff_min, std::min(backoff_max, backoff)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG&& rng, + Policy&& p) { + return [ + pm = std::forward(p), + max_tries, + backoff_min, + backoff_max, + jitter_param, + rngp = std::forward(rng) + ](size_t n, const exception_wrapper& ex) mutable { + if (n == max_tries) { + return makeFuture(false); + } + return pm(n, ex).then( + [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ]( + bool v) mutable { + if (!v) { + return makeFuture(false); + } + auto backoff = detail::retryingJitteredExponentialBackoffDur( + n, backoff_min, backoff_max, jitter_param, rngp); + return futures::sleep(backoff).then([] { return true; }); + }); + }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG&& rng, + Policy&& p, + retrying_policy_raw_tag) { + auto q = [pm = std::forward(p)]( + size_t n, const exception_wrapper& e) { + return makeFuture(pm(n, e)); + }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::forward(rng), + std::move(q)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG&& rng, + Policy&& p, + retrying_policy_fut_tag) { + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::forward(rng), + std::forward(p)); +} +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retrying(std::forward(p), std::forward(ff), tag()); +} + +inline +std::function +retryingPolicyBasic( + size_t max_tries) { + return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG&& rng, + Policy&& p) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::forward(rng), + std::forward(p), + tag()); +} + +inline +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param) { + auto p = [](size_t, const exception_wrapper&) { return true; }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + ThreadLocalPRNG(), + std::move(p)); +} + +} + +// Instantiate the most common Future types to save compile time +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; + +} // namespace folly