From 24a9e5fd0aa688ac3601ebf563209a77c2d8e913 Mon Sep 17 00:00:00 2001 From: Marc Celani Date: Sun, 15 Oct 2017 14:24:44 -0700 Subject: [PATCH] Move retrying method to separate header in folly/futures Summary: folly/futures depends on folly/Random.h, which in turn depends on , which is a fairly large header. Most users of folly::futures do not use retrying, so separate it into a separate header. Reviewed By: yfeldblum Differential Revision: D6028468 fbshipit-source-id: d8155fe2ddff1a65c265a18f040ee6f1be3f3f0a --- folly/Makefile.am | 1 + folly/futures/Future-inl.h | 223 ---------------------- folly/futures/Retrying.h | 276 ++++++++++++++++++++++++++++ folly/futures/helpers.h | 74 +------- folly/futures/test/RetryingTest.cpp | 2 +- 5 files changed, 279 insertions(+), 297 deletions(-) create mode 100644 folly/futures/Retrying.h diff --git a/folly/Makefile.am b/folly/Makefile.am index b08d42b0..ad956fb4 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -215,6 +215,7 @@ nobase_follyinclude_HEADERS = \ futures/Promise-inl.h \ futures/Promise.h \ futures/QueuedImmediateExecutor.h \ + futures/Retrying.h \ futures/ScheduledExecutor.h \ futures/SharedPromise.h \ futures/SharedPromise-inl.h \ diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 18130baf..7a4f073f 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -19,12 +19,10 @@ #include #include #include -#include #include #include #include -#include #include #include @@ -1442,227 +1440,6 @@ namespace futures { } } -namespace futures { - -namespace detail { - -struct retrying_policy_raw_tag {}; -struct retrying_policy_fut_tag {}; - -template -struct retrying_policy_traits { - using result = std::result_of_t; - using is_raw = std::is_same; - using is_fut = std::is_same>; - using tag = typename std::conditional< - is_raw::value, retrying_policy_raw_tag, typename std::conditional< - is_fut::value, retrying_policy_fut_tag, void>::type>::type; -}; - -template -void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) { - using F = typename std::result_of::type; - using T = typename F::value_type; - auto f = makeFutureWith([&] { return ff(k++); }); - f.then([ - k, - prom = std::move(prom), - pm = std::forward(p), - ffm = std::forward(ff) - ](Try && t) mutable { - if (t.hasValue()) { - prom.setValue(std::move(t).value()); - return; - } - auto& x = t.exception(); - auto q = pm(k, x); - q.then([ - k, - prom = std::move(prom), - xm = std::move(x), - pm = std::move(pm), - ffm = std::move(ffm) - ](bool shouldRetry) mutable { - if (shouldRetry) { - retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom)); - } else { - prom.setException(std::move(xm)); - }; - }); - }); -} - -template -typename std::result_of::type -retrying(size_t k, Policy&& p, FF&& ff) { - using F = typename std::result_of::type; - using T = typename F::value_type; - auto prom = Promise(); - auto f = prom.getFuture(); - retryingImpl( - k, std::forward(p), std::forward(ff), std::move(prom)); - return f; -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { - auto q = [pm = std::forward(p)](size_t k, exception_wrapper x) { - return makeFuture(pm(k, x)); - }; - return retrying(0, std::move(q), std::forward(ff)); -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { - return retrying(0, std::forward(p), std::forward(ff)); -} - -// jittered exponential backoff, clamped to [backoff_min, backoff_max] -template -Duration retryingJitteredExponentialBackoffDur( - size_t n, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG& rng) { - using d = Duration; - auto dist = std::normal_distribution(0.0, jitter_param); - auto jitter = std::exp(dist(rng)); - auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); - return std::max(backoff_min, std::min(backoff_max, backoff)); -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p) { - return [ - pm = std::forward(p), - max_tries, - backoff_min, - backoff_max, - jitter_param, - rngp = std::forward(rng) - ](size_t n, const exception_wrapper& ex) mutable { - if (n == max_tries) { - return makeFuture(false); - } - return pm(n, ex).then( - [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ]( - bool v) mutable { - if (!v) { - return makeFuture(false); - } - auto backoff = detail::retryingJitteredExponentialBackoffDur( - n, backoff_min, backoff_max, jitter_param, rngp); - return futures::sleep(backoff).then([] { return true; }); - }); - }; -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p, - retrying_policy_raw_tag) { - auto q = [pm = std::forward(p)]( - size_t n, const exception_wrapper& e) { - return makeFuture(pm(n, e)); - }; - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::move(q)); -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p, - retrying_policy_fut_tag) { - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::forward(p)); -} -} - -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff) { - using tag = typename detail::retrying_policy_traits::tag; - return detail::retrying(std::forward(p), std::forward(ff), tag()); -} - -inline -std::function -retryingPolicyBasic( - size_t max_tries) { - return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; -} - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p) { - using tag = typename detail::retrying_policy_traits::tag; - return detail::retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - std::forward(rng), - std::forward(p), - tag()); -} - -inline -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param) { - auto p = [](size_t, const exception_wrapper&) { return true; }; - return retryingPolicyCappedJitteredExponentialBackoff( - max_tries, - backoff_min, - backoff_max, - jitter_param, - ThreadLocalPRNG(), - std::move(p)); -} - -} - // Instantiate the most common Future types to save compile time extern template class Future; extern template class Future; diff --git a/folly/futures/Retrying.h b/folly/futures/Retrying.h new file mode 100644 index 00000000..91dfb236 --- /dev/null +++ b/folly/futures/Retrying.h @@ -0,0 +1,276 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace folly { +namespace futures { + +/** + * retrying + * + * Given a policy and a future-factory, creates futures according to the + * policy. + * + * The policy must be moveable - retrying will move it a lot - and callable of + * either of the two forms: + * - Future(size_t, exception_wrapper) + * - bool(size_t, exception_wrapper) + * Internally, the latter is transformed into the former in the obvious way. + * The first parameter is the attempt number of the next prospective attempt; + * the second parameter is the most recent exception. The policy returns a + * Future which, when completed with true, indicates that a retry is + * desired. + * + * We provide a few generic policies: + * - Basic + * - CappedJitteredexponentialBackoff + * + * Custom policies may use the most recent try number and exception to decide + * whether to retry and optionally to do something interesting like delay + * before the retry. Users may pass inline lambda expressions as policies, or + * may define their own data types meeting the above requirements. Users are + * responsible for managing the lifetimes of anything pointed to or referred to + * from inside the policy. + * + * For example, one custom policy may try up to k times, but only if the most + * recent exception is one of a few types or has one of a few error codes + * indicating that the failure was transitory. + * + * Cancellation is not supported. + * + * If both FF and Policy inline executes, then it is possible to hit a stack + * overflow due to the recursive nature of the retry implementation + */ +template +typename std::result_of::type retrying(Policy&& p, FF&& ff); + +namespace detail { + +struct retrying_policy_raw_tag {}; +struct retrying_policy_fut_tag {}; + +template +struct retrying_policy_traits { + using result = std::result_of_t; + using is_raw = std::is_same; + using is_fut = std::is_same>; + using tag = typename std::conditional< + is_raw::value, + retrying_policy_raw_tag, + typename std::conditional:: + type>::type; +}; + +template +void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) { + using F = typename std::result_of::type; + using T = typename F::value_type; + auto f = makeFutureWith([&] { return ff(k++); }); + f.then([k, + prom = std::move(prom), + pm = std::forward(p), + ffm = std::forward(ff)](Try&& t) mutable { + if (t.hasValue()) { + prom.setValue(std::move(t).value()); + return; + } + auto& x = t.exception(); + auto q = pm(k, x); + q.then([k, + prom = std::move(prom), + xm = std::move(x), + pm = std::move(pm), + ffm = std::move(ffm)](bool shouldRetry) mutable { + if (shouldRetry) { + retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom)); + } else { + prom.setException(std::move(xm)); + }; + }); + }); +} + +template +typename std::result_of::type +retrying(size_t k, Policy&& p, FF&& ff) { + using F = typename std::result_of::type; + using T = typename F::value_type; + auto prom = Promise(); + auto f = prom.getFuture(); + retryingImpl( + k, std::forward(p), std::forward(ff), std::move(prom)); + return f; +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) { + auto q = [pm = std::forward(p)](size_t k, exception_wrapper x) { + return makeFuture(pm(k, x)); + }; + return retrying(0, std::move(q), std::forward(ff)); +} + +template +typename std::result_of::type +retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) { + return retrying(0, std::forward(p), std::forward(ff)); +} + +// jittered exponential backoff, clamped to [backoff_min, backoff_max] +template +Duration retryingJitteredExponentialBackoffDur( + size_t n, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG& rng) { + using d = Duration; + auto dist = std::normal_distribution(0.0, jitter_param); + auto jitter = std::exp(dist(rng)); + auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1))); + return std::max(backoff_min, std::min(backoff_max, backoff)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG&& rng, + Policy&& p) { + return [pm = std::forward(p), + max_tries, + backoff_min, + backoff_max, + jitter_param, + rngp = std::forward(rng)]( + size_t n, const exception_wrapper& ex) mutable { + if (n == max_tries) { + return makeFuture(false); + } + return pm(n, ex).then( + [n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp)]( + bool v) mutable { + if (!v) { + return makeFuture(false); + } + auto backoff = detail::retryingJitteredExponentialBackoffDur( + n, backoff_min, backoff_max, jitter_param, rngp); + return futures::sleep(backoff).then([] { return true; }); + }); + }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG&& rng, + Policy&& p, + retrying_policy_raw_tag) { + auto q = [pm = std::forward(p)]( + size_t n, const exception_wrapper& e) { + return makeFuture(pm(n, e)); + }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::forward(rng), + std::move(q)); +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG&& rng, + Policy&& p, + retrying_policy_fut_tag) { + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::forward(rng), + std::forward(p)); +} + +} // namespace detail + +template +typename std::result_of::type retrying(Policy&& p, FF&& ff) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retrying(std::forward(p), std::forward(ff), tag()); +} + +inline std::function +retryingPolicyBasic(size_t max_tries) { + return [=](size_t n, const exception_wrapper&) { return n < max_tries; }; +} + +template +std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param, + URNG&& rng, + Policy&& p) { + using tag = typename detail::retrying_policy_traits::tag; + return detail::retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + std::forward(rng), + std::forward(p), + tag()); +} + +inline std::function(size_t, const exception_wrapper&)> +retryingPolicyCappedJitteredExponentialBackoff( + size_t max_tries, + Duration backoff_min, + Duration backoff_max, + double jitter_param) { + auto p = [](size_t, const exception_wrapper&) { return true; }; + return retryingPolicyCappedJitteredExponentialBackoff( + max_tries, + backoff_min, + backoff_max, + jitter_param, + ThreadLocalPRNG(), + std::move(p)); +} + +} // namespace futures +} // namespace folly diff --git a/folly/futures/helpers.h b/folly/futures/helpers.h index 2dc55d1b..ff132af0 100644 --- a/folly/futures/helpers.h +++ b/folly/futures/helpers.h @@ -384,76 +384,4 @@ auto unorderedReduce(Collection&& c, T&& initial, F&& func) std::forward(initial), std::forward(func)); } - -namespace futures { - -/** - * retrying - * - * Given a policy and a future-factory, creates futures according to the - * policy. - * - * The policy must be moveable - retrying will move it a lot - and callable of - * either of the two forms: - * - Future(size_t, exception_wrapper) - * - bool(size_t, exception_wrapper) - * Internally, the latter is transformed into the former in the obvious way. - * The first parameter is the attempt number of the next prospective attempt; - * the second parameter is the most recent exception. The policy returns a - * Future which, when completed with true, indicates that a retry is - * desired. - * - * We provide a few generic policies: - * - Basic - * - CappedJitteredexponentialBackoff - * - * Custom policies may use the most recent try number and exception to decide - * whether to retry and optionally to do something interesting like delay - * before the retry. Users may pass inline lambda expressions as policies, or - * may define their own data types meeting the above requirements. Users are - * responsible for managing the lifetimes of anything pointed to or referred to - * from inside the policy. - * - * For example, one custom policy may try up to k times, but only if the most - * recent exception is one of a few types or has one of a few error codes - * indicating that the failure was transitory. - * - * Cancellation is not supported. - * - * If both FF and Policy inline executes, then it is possible to hit a stack - * overflow due to the recursive nature of the retry implementation - */ -template -typename std::result_of::type -retrying(Policy&& p, FF&& ff); - -/** - * generic retrying policies - */ - -inline -std::function -retryingPolicyBasic( - size_t max_tries); - -template -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param, - URNG&& rng, - Policy&& p); - -inline -std::function(size_t, const exception_wrapper&)> -retryingPolicyCappedJitteredExponentialBackoff( - size_t max_tries, - Duration backoff_min, - Duration backoff_max, - double jitter_param); - -} - -} // namespace +} // namespace folly diff --git a/folly/futures/test/RetryingTest.cpp b/folly/futures/test/RetryingTest.cpp index 61505c6a..c99f3fce 100644 --- a/folly/futures/test/RetryingTest.cpp +++ b/folly/futures/test/RetryingTest.cpp @@ -18,7 +18,7 @@ #include #include -#include +#include #include #include #include "TestExecutor.h" -- 2.34.1