X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2FFuture-inl.h;h=091357ee5b23a796c14631cd2fcf001047b7e704;hb=a1614feea3f3c0beb75fb2dc43ec45b3e5d57223;hp=d315b5055e46131e5c0a7675326b38e3da6e4c0f;hpb=3651364b70406ff1c0c2ed2214777038cf1fe526;p=folly.git diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index d315b505..091357ee 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -16,20 +16,39 @@ #pragma once +#include +#include #include +#include #include - -#include +#include #include +#include +#include #include #include +#if defined(__ANDROID__) || defined(__APPLE__) +#define FOLLY_FUTURE_USING_FIBER 0 +#else +#define FOLLY_FUTURE_USING_FIBER 1 +#include +#endif + namespace folly { class Timekeeper; namespace detail { - Timekeeper* getTimekeeperSingleton(); +#if FOLLY_FUTURE_USING_FIBER +typedef folly::fibers::Baton FutureBatonType; +#else +typedef folly::Baton<> FutureBatonType; +#endif +} + +namespace detail { + std::shared_ptr getTimekeeperSingleton(); } template @@ -45,23 +64,13 @@ Future& Future::operator=(Future&& other) noexcept { template template -Future::Future(T2&& val) : core_(nullptr) { - Promise p; - p.setValue(std::forward(val)); - *this = p.getFuture(); -} +Future::Future(T2&& val) + : core_(new detail::Core(Try(std::forward(val)))) {} template -template ::value, - int>::type> -Future::Future() : core_(nullptr) { - Promise p; - p.setValue(); - *this = p.getFuture(); -} - +template +Future::Future() + : core_(new detail::Core(Try(T()))) {} template Future::~Future() { @@ -116,13 +125,12 @@ Future::thenImplementation(F func, detail::argResult) { // wrap these so we can move them into the lambda folly::MoveWrapper> p; + p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); folly::MoveWrapper funcm(std::forward(func)); // grab the Future now before we lose our handle on the Promise auto f = p->getFuture(); - if (getExecutor()) { - f.setExecutor(getExecutor()); - } + f.core_->setExecutorNoLock(getExecutor()); /* This is a bit tricky. @@ -183,13 +191,12 @@ Future::thenImplementation(F func, detail::argResult) { // wrap these so we can move them into the lambda folly::MoveWrapper> p; + p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); folly::MoveWrapper funcm(std::forward(func)); // grab the Future now before we lose our handle on the Promise auto f = p->getFuture(); - if (getExecutor()) { - f.setExecutor(getExecutor()); - } + f.core_->setExecutorNoLock(getExecutor()); setCallback_( [p, funcm](Try&& t) mutable { @@ -238,8 +245,8 @@ auto Future::then(Executor* x, Arg&& arg, Args&&... args) } template -Future Future::then() { - return then([] (Try&& t) {}); +Future Future::then() { + return then([] () {}); } // onError where the callback returns T @@ -256,6 +263,7 @@ Future::onError(F&& func) { "Return type of onError callback must be T or Future"); Promise p; + p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); auto f = p.getFuture(); auto pm = folly::makeMoveWrapper(std::move(p)); auto funcm = folly::makeMoveWrapper(std::move(func)); @@ -313,7 +321,7 @@ template template Future Future::ensure(F func) { MoveWrapper funcw(std::move(func)); - return this->then([funcw](Try&& t) { + return this->then([funcw](Try&& t) mutable { (*funcw)(); return makeFuture(std::move(t)); }); @@ -440,12 +448,31 @@ inline Future Future::via(Executor* executor, int8_t priority) & { return std::move(f).via(executor, priority); } + +template +auto via(Executor* x, Func func) + -> Future::Inner> +{ + // TODO make this actually more performant. :-P #7260175 + return via(x).then(func); +} + template bool Future::isReady() const { throwIfInvalid(); return core_->ready(); } +template +bool Future::hasValue() { + return getTry().hasValue(); +} + +template +bool Future::hasException() { + return getTry().hasException(); +} + template void Future::raise(exception_wrapper exception) { core_->raise(std::move(exception)); @@ -455,78 +482,69 @@ void Future::raise(exception_wrapper exception) { template Future::type> makeFuture(T&& t) { - Promise::type> p; - p.setValue(std::forward(t)); - return p.getFuture(); + return makeFuture(Try::type>(std::forward(t))); } inline // for multiple translation units -Future makeFuture() { - Promise p; - p.setValue(); - return p.getFuture(); +Future makeFuture() { + return makeFuture(Unit{}); } +// makeFutureWith(Future()) -> Future template -auto makeFutureWith( - F&& func, - typename std::enable_if::value, bool>::type sdf) - -> Future { - Promise p; - p.setWith( - [&func]() { - return (func)(); - }); - return p.getFuture(); +typename std::enable_if::type>::value, + typename std::result_of::type>::type +makeFutureWith(F&& func) { + using InnerType = + typename isFuture::type>::Inner; + try { + return func(); + } catch (std::exception& e) { + return makeFuture( + exception_wrapper(std::current_exception(), e)); + } catch (...) { + return makeFuture(exception_wrapper(std::current_exception())); + } } +// makeFutureWith(T()) -> Future +// makeFutureWith(void()) -> Future template -auto makeFutureWith(F const& func) -> Future { - F copy = func; - return makeFutureWith(std::move(copy)); +typename std::enable_if< + !(isFuture::type>::value), + Future::type>::type>>::type +makeFutureWith(F&& func) { + using LiftedResult = + typename Unit::Lift::type>::type; + return makeFuture(makeTryWith([&func]() mutable { + return func(); + })); } template Future makeFuture(std::exception_ptr const& e) { - Promise p; - p.setException(e); - return p.getFuture(); + return makeFuture(Try(e)); } template Future makeFuture(exception_wrapper ew) { - Promise p; - p.setException(std::move(ew)); - return p.getFuture(); + return makeFuture(Try(std::move(ew))); } template typename std::enable_if::value, Future>::type makeFuture(E const& e) { - Promise p; - p.setException(make_exception_wrapper(e)); - return p.getFuture(); + return makeFuture(Try(make_exception_wrapper(e))); } template Future makeFuture(Try&& t) { - Promise::type> p; - p.setTry(std::move(t)); - return p.getFuture(); -} - -template <> -inline Future makeFuture(Try&& t) { - if (t.hasException()) { - return makeFuture(std::move(t.exception())); - } else { - return makeFuture(); - } + return Future(new detail::Core(std::move(t))); } // via -Future via(Executor* executor, int8_t priority) { +Future via(Executor* executor, int8_t priority) { return makeFuture().via(executor, priority); } @@ -544,13 +562,13 @@ void mapSetCallback(InputIterator first, InputIterator last, F func) { // collectAll (variadic) template -typename detail::VariadicContext< +typename detail::CollectAllVariadicContext< typename std::decay::type::value_type...>::type collectAll(Fs&&... fs) { - auto ctx = std::make_shared::type::value_type...>>(); - detail::collectAllVariadicHelper(ctx, - std::forward::type>(fs)...); + detail::collectVariadicHelper( + ctx, std::forward::type>(fs)...); return ctx->p.getFuture(); } @@ -580,11 +598,15 @@ collectAll(InputIterator first, InputIterator last) { return ctx->p.getFuture(); } +// collect (iterator) + namespace detail { template struct CollectContext { - struct Nothing { explicit Nothing(int n) {} }; + struct Nothing { + explicit Nothing(int /* n */) {} + }; using Result = typename std::conditional< std::is_void::value, @@ -613,17 +635,9 @@ struct CollectContext { } Promise p; InternalResult result; - std::atomic threw; + std::atomic threw {false}; }; -// Specialize for void (implementations in Future.cpp) - -template <> -CollectContext::~CollectContext(); - -template <> -void CollectContext::setPartialResult(size_t i, Try& t); - } template @@ -647,6 +661,21 @@ collect(InputIterator first, InputIterator last) { return ctx->p.getFuture(); } +// collect (variadic) + +template +typename detail::CollectVariadicContext< + typename std::decay::type::value_type...>::type +collect(Fs&&... fs) { + auto ctx = std::make_shared::type::value_type...>>(); + detail::collectVariadicHelper( + ctx, std::forward::type>(fs)...); + return ctx->p.getFuture(); +} + +// collectAny (iterator) + template Future< std::pair::value_type::value_type T; struct CollectAnyContext { - CollectAnyContext(size_t n) : done(false) {}; + CollectAnyContext() {}; Promise>> p; - std::atomic done; + std::atomic done {false}; }; - auto ctx = std::make_shared(std::distance(first, last)); + auto ctx = std::make_shared(); mapSetCallback(first, last, [ctx](size_t i, Try&& t) { if (!ctx->done.exchange(true)) { ctx->p.setValue(std::make_pair(i, std::move(t))); @@ -672,6 +701,8 @@ collectAny(InputIterator first, InputIterator last) { return ctx->p.getFuture(); } +// collectN (iterator) + template Future::value_type::value_type>>>> @@ -687,7 +718,7 @@ collectN(InputIterator first, InputIterator last, size_t n) { }; auto ctx = std::make_shared(); - if (std::distance(first, last) < n) { + if (size_t(std::distance(first, last)) < n) { ctx->p.setException(std::runtime_error("Not enough futures")); } else { // for each completed Future, increase count and add to vector, until we @@ -697,7 +728,7 @@ collectN(InputIterator first, InputIterator last, size_t n) { auto c = ++ctx->completed; if (c <= n) { assert(ctx->v.size() < n); - ctx->v.push_back(std::make_pair(i, std::move(t))); + ctx->v.emplace_back(i, std::move(t)); if (c == n) { ctx->p.setTry(Try(std::move(ctx->v))); } @@ -708,6 +739,8 @@ collectN(InputIterator first, InputIterator last, size_t n) { return ctx->p.getFuture(); } +// reduce (iterator) + template Future reduce(It first, It last, T&& initial, F&& func) { if (first == last) { @@ -739,15 +772,17 @@ Future reduce(It first, It last, T&& initial, F&& func) { return f; } +// window (collection) + template std::vector> window(Collection input, F func, size_t n) { struct WindowContext { WindowContext(Collection&& i, F&& fn) - : i_(0), input_(std::move(i)), promises_(input_.size()), + : input_(std::move(i)), promises_(input_.size()), func_(std::move(fn)) {} - std::atomic i_; + std::atomic i_ {0}; Collection input_; std::vector> promises_; F func_; @@ -786,6 +821,8 @@ window(Collection input, F func, size_t n) { return futures; } +// reduce + template template Future Future::reduce(I&& initial, F&& func) { @@ -800,6 +837,8 @@ Future Future::reduce(I&& initial, F&& func) { }); } +// unorderedReduce (iterator) + template Future unorderedReduce(It first, It last, T initial, F func) { if (first == last) { @@ -824,30 +863,35 @@ Future unorderedReduce(It first, It last, T initial, F func) { auto ctx = std::make_shared( std::move(initial), std::move(func), std::distance(first, last)); - mapSetCallback(first, last, [ctx](size_t i, Try&& t) { - folly::MoveWrapper> mt(std::move(t)); - // Futures can be completed in any order, simultaneously. - // To make this non-blocking, we create a new Future chain in - // the order of completion to reduce the values. - // The spinlock just protects chaining a new Future, not actually - // executing the reduce, which should be really fast. - folly::MSLGuard lock(ctx->lock_); - ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable { - // Either return a ItT&& or a Try&& depending - // on the type of the argument of func. - return ctx->func_(std::move(v), mt->template get()); - }); - if (++ctx->numThens_ == ctx->numFutures_) { - // After reducing the value of the last Future, fulfill the Promise - ctx->memo_.setCallback_([ctx](Try&& t2) { - ctx->promise_.setValue(std::move(t2)); + mapSetCallback( + first, + last, + [ctx](size_t /* i */, Try&& t) { + folly::MoveWrapper> mt(std::move(t)); + // Futures can be completed in any order, simultaneously. + // To make this non-blocking, we create a new Future chain in + // the order of completion to reduce the values. + // The spinlock just protects chaining a new Future, not actually + // executing the reduce, which should be really fast. + folly::MSLGuard lock(ctx->lock_); + ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable { + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + return ctx->func_(std::move(v), + mt->template get()); + }); + if (++ctx->numThens_ == ctx->numFutures_) { + // After reducing the value of the last Future, fulfill the Promise + ctx->memo_.setCallback_( + [ctx](Try&& t2) { ctx->promise_.setValue(std::move(t2)); }); + } }); - } - }); return ctx->promise_.getFuture(); } +// within + template Future Future::within(Duration dur, Timekeeper* tk) { return within(dur, TimedOut(), tk); @@ -858,41 +902,49 @@ template Future Future::within(Duration dur, E e, Timekeeper* tk) { struct Context { - Context(E ex) : exception(std::move(ex)), promise(), token(false) {} + Context(E ex) : exception(std::move(ex)), promise() {} E exception; + Future thisFuture; Promise promise; - std::atomic token; + std::atomic token {false}; }; - auto ctx = std::make_shared(std::move(e)); + std::shared_ptr tks; if (!tk) { - tk = folly::detail::getTimekeeperSingleton(); + tks = folly::detail::getTimekeeperSingleton(); + tk = DCHECK_NOTNULL(tks.get()); } - tk->after(dur) - .then([ctx](Try const& t) { - if (ctx->token.exchange(true) == false) { - if (t.hasException()) { - ctx->promise.setException(std::move(t.exception())); - } else { - ctx->promise.setException(std::move(ctx->exception)); - } - } - }); + auto ctx = std::make_shared(std::move(e)); - this->then([ctx](Try&& t) { + ctx->thisFuture = this->then([ctx](Try&& t) mutable { + // TODO: "this" completed first, cancel "after" if (ctx->token.exchange(true) == false) { ctx->promise.setTry(std::move(t)); } }); - return ctx->promise.getFuture(); + tk->after(dur).then([ctx](Try const& t) mutable { + // "after" completed first, cancel "this" + ctx->thisFuture.raise(TimedOut()); + if (ctx->token.exchange(true) == false) { + if (t.hasException()) { + ctx->promise.setException(std::move(t.exception())); + } else { + ctx->promise.setException(std::move(ctx->exception)); + } + } + }); + + return ctx->promise.getFuture().via(getExecutor()); } +// delayed + template Future Future::delayed(Duration dur, Timekeeper* tk) { return collectAll(*this, futures::sleep(dur, tk)) - .then([](std::tuple, Try> tup) { + .then([](std::tuple, Try> tup) { Try& t = std::get<0>(tup); return makeFuture(std::move(t)); }); @@ -905,19 +957,10 @@ void waitImpl(Future& f) { // short-circuit if there's nothing to do if (f.isReady()) return; - folly::fibers::Baton baton; - f = f.then([&](Try t) { - baton.post(); - return makeFuture(std::move(t)); - }); + FutureBatonType baton; + f.setCallback_([&](const Try& /* t */) { baton.post(); }); baton.wait(); - - // There's a race here between the return here and the actual finishing of - // the future. f is completed, but the setup may not have finished on done - // after the baton has posted. - while (!f.isReady()) { - std::this_thread::yield(); - } + assert(f.isReady()); } template @@ -925,19 +968,16 @@ void waitImpl(Future& f, Duration dur) { // short-circuit if there's nothing to do if (f.isReady()) return; - auto baton = std::make_shared(); - f = f.then([baton](Try t) { + folly::MoveWrapper> promise; + auto ret = promise->getFuture(); + auto baton = std::make_shared(); + f.setCallback_([baton, promise](Try&& t) mutable { + promise->setTry(std::move(t)); baton->post(); - return makeFuture(std::move(t)); }); - - // Let's preserve the invariant that if we did not timeout (timed_wait returns - // true), then the returned Future is complete when it is returned to the - // caller. We need to wait out the race for that Future to complete. + f = std::move(ret); if (baton->timed_wait(dur)) { - while (!f.isReady()) { - std::this_thread::yield(); - } + assert(f.isReady()); } } @@ -991,11 +1031,6 @@ T Future::get() { return std::move(wait().value()); } -template <> -inline void Future::get() { - wait().value(); -} - template T Future::get(Duration dur) { wait(dur); @@ -1006,26 +1041,11 @@ T Future::get(Duration dur) { } } -template <> -inline void Future::get(Duration dur) { - wait(dur); - if (isReady()) { - return; - } else { - throw TimedOut(); - } -} - template T Future::getVia(DrivableExecutor* e) { return std::move(waitVia(e).value()); } -template <> -inline void Future::getVia(DrivableExecutor* e) { - waitVia(e).value(); -} - namespace detail { template struct TryEquals { @@ -1033,13 +1053,6 @@ namespace detail { return t1.value() == t2.value(); } }; - - template <> - struct TryEquals { - static bool equals(const Try& t1, const Try& t2) { - return true; - } - }; } template @@ -1106,6 +1119,31 @@ auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn) return then(x, std::forward(fn)); } +template +inline Future when(bool p, F thunk) { + return p ? thunk().unit() : makeFuture(); +} + +template +Future whileDo(P predicate, F thunk) { + if (predicate()) { + return thunk().then([=] { + return whileDo(predicate, thunk); + }); + } + return makeFuture(); +} + +template +Future times(const int n, F thunk) { + auto count = folly::makeMoveWrapper( + std::unique_ptr>(new std::atomic(0)) + ); + return folly::whileDo([=]() mutable { + return (*count)->fetch_add(1) < n; + }, thunk); +} + namespace futures { template std::vector> map(It first, It last, F func) { @@ -1117,8 +1155,199 @@ namespace futures { } } +namespace futures { + +namespace detail { + +struct retrying_policy_raw_tag {}; +struct retrying_policy_fut_tag {}; + +template +struct retrying_policy_traits { + using ew = exception_wrapper; + FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator()); + template + using has_op = typename std::integral_constant::value || + has_op_call::value>; + using is_raw = has_op; + using is_fut = has_op>; + using tag = typename std::conditional< + is_raw::value, retrying_policy_raw_tag, typename std::conditional< + is_fut::value, retrying_policy_fut_tag, void>::type>::type; +}; + +template +typename std::result_of::type +retrying(size_t k, Policy&& p, FF&& ff) { + using F = typename std::result_of::type; + using T = typename F::value_type; + auto f = ff(k++); + auto pm = makeMoveWrapper(p); + auto ffm = makeMoveWrapper(ff); + return f.onError([=](exception_wrapper x) mutable { + auto q = (*pm)(k, x); + auto xm = makeMoveWrapper(std::move(x)); + return q.then([=](bool r) mutable { + return r + ? retrying(k, pm.move(), ffm.move()) + : makeFuture(xm.move()); + }); + }); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { + auto pm = makeMoveWrapper(std::move(p)); + auto q = [=](size_t k, exception_wrapper x) { + return makeFuture((*pm)(k, x)); + }; + return retrying(0, std::move(q), std::forward(ff)); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { + return retrying(0, std::forward(p), std::forward(ff)); +} + +// jittered exponential backoff, clamped to [backoff_min, backoff_max] +template +Duration retryingJitteredExponentialBackoffDur( + size_t n, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG& rng) { + using d = Duration; + auto dist = std::normal_distribution(0.0, jitter_param); + auto jitter = std::exp(dist(rng)); + auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); + return std::max(backoff_min, std::min(backoff_max, backoff)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p) { + auto pm = makeMoveWrapper(std::move(p)); + auto rngp = std::make_shared(std::move(rng)); + return [=](size_t n, const exception_wrapper& ex) mutable { + if (n == max_tries) { return makeFuture(false); } + return (*pm)(n, ex).then([=](bool v) { + if (!v) { return makeFuture(false); } + auto backoff = detail::retryingJitteredExponentialBackoffDur( + n, backoff_min, backoff_max, jitter_param, *rngp); + return futures::sleep(backoff).then([] { return true; }); + }); + }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p, + retrying_policy_raw_tag) { + auto pm = makeMoveWrapper(std::move(p)); + auto q = [=](size_t n, const exception_wrapper& e) { + return makeFuture((*pm)(n, e)); + }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(q)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p, + retrying_policy_fut_tag) { + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(p)); +} + +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retrying(std::forward(p), std::forward(ff), tag()); +} + +inline +std::function +retryingPolicyBasic( + size_t max_tries) { + return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(p), + tag()); +} + +inline +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param) { + auto p = [](size_t, const exception_wrapper&) { return true; }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + ThreadLocalPRNG(), + std::move(p)); +} + +} + // Instantiate the most common Future types to save compile time -extern template class Future; +extern template class Future; extern template class Future; extern template class Future; extern template class Future; @@ -1126,8 +1355,3 @@ extern template class Future; extern template class Future; } // namespace folly - -// I haven't included a Future specialization because I don't forsee us -// using it, however it is not difficult to add when needed. Refer to -// Future for guidance. std::future and boost::future code would also be -// instructive.