X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2FFuture-inl.h;h=091357ee5b23a796c14631cd2fcf001047b7e704;hb=a1614feea3f3c0beb75fb2dc43ec45b3e5d57223;hp=0756fd34423f6e5a8e54ab1f3f978f4bb33884b4;hpb=031ed38a57114bca7a72dd61a29ad84feaab60ed;p=folly.git diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 0756fd34..091357ee 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -16,20 +16,39 @@ #pragma once +#include +#include #include +#include #include - -#include +#include #include +#include +#include #include #include +#if defined(__ANDROID__) || defined(__APPLE__) +#define FOLLY_FUTURE_USING_FIBER 0 +#else +#define FOLLY_FUTURE_USING_FIBER 1 +#include +#endif + namespace folly { class Timekeeper; namespace detail { - Timekeeper* getTimekeeperSingleton(); +#if FOLLY_FUTURE_USING_FIBER +typedef folly::fibers::Baton FutureBatonType; +#else +typedef folly::Baton<> FutureBatonType; +#endif +} + +namespace detail { + std::shared_ptr getTimekeeperSingleton(); } template @@ -244,6 +263,7 @@ Future::onError(F&& func) { "Return type of onError callback must be T or Future"); Promise p; + p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler()); auto f = p.getFuture(); auto pm = folly::makeMoveWrapper(std::move(p)); auto funcm = folly::makeMoveWrapper(std::move(func)); @@ -301,7 +321,7 @@ template template Future Future::ensure(F func) { MoveWrapper funcw(std::move(func)); - return this->then([funcw](Try&& t) { + return this->then([funcw](Try&& t) mutable { (*funcw)(); return makeFuture(std::move(t)); }); @@ -470,10 +490,32 @@ Future makeFuture() { return makeFuture(Unit{}); } +// makeFutureWith(Future()) -> Future template -auto makeFutureWith(F&& func) - -> Future::type> { - using LiftedResult = typename Unit::Lift::type; +typename std::enable_if::type>::value, + typename std::result_of::type>::type +makeFutureWith(F&& func) { + using InnerType = + typename isFuture::type>::Inner; + try { + return func(); + } catch (std::exception& e) { + return makeFuture( + exception_wrapper(std::current_exception(), e)); + } catch (...) { + return makeFuture(exception_wrapper(std::current_exception())); + } +} + +// makeFutureWith(T()) -> Future +// makeFutureWith(void()) -> Future +template +typename std::enable_if< + !(isFuture::type>::value), + Future::type>::type>>::type +makeFutureWith(F&& func) { + using LiftedResult = + typename Unit::Lift::type>::type; return makeFuture(makeTryWith([&func]() mutable { return func(); })); @@ -562,7 +604,9 @@ namespace detail { template struct CollectContext { - struct Nothing { explicit Nothing(int n) {} }; + struct Nothing { + explicit Nothing(int /* n */) {} + }; using Result = typename std::conditional< std::is_void::value, @@ -819,26 +863,29 @@ Future unorderedReduce(It first, It last, T initial, F func) { auto ctx = std::make_shared( std::move(initial), std::move(func), std::distance(first, last)); - mapSetCallback(first, last, [ctx](size_t i, Try&& t) { - folly::MoveWrapper> mt(std::move(t)); - // Futures can be completed in any order, simultaneously. - // To make this non-blocking, we create a new Future chain in - // the order of completion to reduce the values. - // The spinlock just protects chaining a new Future, not actually - // executing the reduce, which should be really fast. - folly::MSLGuard lock(ctx->lock_); - ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable { - // Either return a ItT&& or a Try&& depending - // on the type of the argument of func. - return ctx->func_(std::move(v), mt->template get()); - }); - if (++ctx->numThens_ == ctx->numFutures_) { - // After reducing the value of the last Future, fulfill the Promise - ctx->memo_.setCallback_([ctx](Try&& t2) { - ctx->promise_.setValue(std::move(t2)); + mapSetCallback( + first, + last, + [ctx](size_t /* i */, Try&& t) { + folly::MoveWrapper> mt(std::move(t)); + // Futures can be completed in any order, simultaneously. + // To make this non-blocking, we create a new Future chain in + // the order of completion to reduce the values. + // The spinlock just protects chaining a new Future, not actually + // executing the reduce, which should be really fast. + folly::MSLGuard lock(ctx->lock_); + ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable { + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + return ctx->func_(std::move(v), + mt->template get()); + }); + if (++ctx->numThens_ == ctx->numFutures_) { + // After reducing the value of the last Future, fulfill the Promise + ctx->memo_.setCallback_( + [ctx](Try&& t2) { ctx->promise_.setValue(std::move(t2)); }); + } }); - } - }); return ctx->promise_.getFuture(); } @@ -862,8 +909,10 @@ Future Future::within(Duration dur, E e, Timekeeper* tk) { std::atomic token {false}; }; + std::shared_ptr tks; if (!tk) { - tk = folly::detail::getTimekeeperSingleton(); + tks = folly::detail::getTimekeeperSingleton(); + tk = DCHECK_NOTNULL(tks.get()); } auto ctx = std::make_shared(std::move(e)); @@ -908,19 +957,10 @@ void waitImpl(Future& f) { // short-circuit if there's nothing to do if (f.isReady()) return; - folly::fibers::Baton baton; - f = f.then([&](Try t) { - baton.post(); - return makeFuture(std::move(t)); - }); + FutureBatonType baton; + f.setCallback_([&](const Try& /* t */) { baton.post(); }); baton.wait(); - - // There's a race here between the return here and the actual finishing of - // the future. f is completed, but the setup may not have finished on done - // after the baton has posted. - while (!f.isReady()) { - std::this_thread::yield(); - } + assert(f.isReady()); } template @@ -928,19 +968,16 @@ void waitImpl(Future& f, Duration dur) { // short-circuit if there's nothing to do if (f.isReady()) return; - auto baton = std::make_shared(); - f = f.then([baton](Try t) { + folly::MoveWrapper> promise; + auto ret = promise->getFuture(); + auto baton = std::make_shared(); + f.setCallback_([baton, promise](Try&& t) mutable { + promise->setTry(std::move(t)); baton->post(); - return makeFuture(std::move(t)); }); - - // Let's preserve the invariant that if we did not timeout (timed_wait returns - // true), then the returned Future is complete when it is returned to the - // caller. We need to wait out the race for that Future to complete. + f = std::move(ret); if (baton->timed_wait(dur)) { - while (!f.isReady()) { - std::this_thread::yield(); - } + assert(f.isReady()); } } @@ -1082,6 +1119,31 @@ auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn) return then(x, std::forward(fn)); } +template +inline Future when(bool p, F thunk) { + return p ? thunk().unit() : makeFuture(); +} + +template +Future whileDo(P predicate, F thunk) { + if (predicate()) { + return thunk().then([=] { + return whileDo(predicate, thunk); + }); + } + return makeFuture(); +} + +template +Future times(const int n, F thunk) { + auto count = folly::makeMoveWrapper( + std::unique_ptr>(new std::atomic(0)) + ); + return folly::whileDo([=]() mutable { + return (*count)->fetch_add(1) < n; + }, thunk); +} + namespace futures { template std::vector> map(It first, It last, F func) { @@ -1093,6 +1155,197 @@ namespace futures { } } +namespace futures { + +namespace detail { + +struct retrying_policy_raw_tag {}; +struct retrying_policy_fut_tag {}; + +template +struct retrying_policy_traits { + using ew = exception_wrapper; + FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator()); + template + using has_op = typename std::integral_constant::value || + has_op_call::value>; + using is_raw = has_op; + using is_fut = has_op>; + using tag = typename std::conditional< + is_raw::value, retrying_policy_raw_tag, typename std::conditional< + is_fut::value, retrying_policy_fut_tag, void>::type>::type; +}; + +template +typename std::result_of::type +retrying(size_t k, Policy&& p, FF&& ff) { + using F = typename std::result_of::type; + using T = typename F::value_type; + auto f = ff(k++); + auto pm = makeMoveWrapper(p); + auto ffm = makeMoveWrapper(ff); + return f.onError([=](exception_wrapper x) mutable { + auto q = (*pm)(k, x); + auto xm = makeMoveWrapper(std::move(x)); + return q.then([=](bool r) mutable { + return r + ? retrying(k, pm.move(), ffm.move()) + : makeFuture(xm.move()); + }); + }); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { + auto pm = makeMoveWrapper(std::move(p)); + auto q = [=](size_t k, exception_wrapper x) { + return makeFuture((*pm)(k, x)); + }; + return retrying(0, std::move(q), std::forward(ff)); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { + return retrying(0, std::forward(p), std::forward(ff)); +} + +// jittered exponential backoff, clamped to [backoff_min, backoff_max] +template +Duration retryingJitteredExponentialBackoffDur( + size_t n, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG& rng) { + using d = Duration; + auto dist = std::normal_distribution(0.0, jitter_param); + auto jitter = std::exp(dist(rng)); + auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); + return std::max(backoff_min, std::min(backoff_max, backoff)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p) { + auto pm = makeMoveWrapper(std::move(p)); + auto rngp = std::make_shared(std::move(rng)); + return [=](size_t n, const exception_wrapper& ex) mutable { + if (n == max_tries) { return makeFuture(false); } + return (*pm)(n, ex).then([=](bool v) { + if (!v) { return makeFuture(false); } + auto backoff = detail::retryingJitteredExponentialBackoffDur( + n, backoff_min, backoff_max, jitter_param, *rngp); + return futures::sleep(backoff).then([] { return true; }); + }); + }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p, + retrying_policy_raw_tag) { + auto pm = makeMoveWrapper(std::move(p)); + auto q = [=](size_t n, const exception_wrapper& e) { + return makeFuture((*pm)(n, e)); + }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(q)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p, + retrying_policy_fut_tag) { + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(p)); +} + +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retrying(std::forward(p), std::forward(ff), tag()); +} + +inline +std::function +retryingPolicyBasic( + size_t max_tries) { + return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG rng, + Policy&& p) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::move(rng), + std::move(p), + tag()); +} + +inline +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param) { + auto p = [](size_t, const exception_wrapper&) { return true; }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + ThreadLocalPRNG(), + std::move(p)); +} + +} + // Instantiate the most common Future types to save compile time extern template class Future; extern template class Future;