folly: build with -Wunused-parameter
[folly.git] / folly / futures / Future-inl.h
index 0756fd34423f6e5a8e54ab1f3f978f4bb33884b4..091357ee5b23a796c14631cd2fcf001047b7e704 100644 (file)
 
 #pragma once
 
+#include <algorithm>
+#include <cassert>
 #include <chrono>
+#include <random>
 #include <thread>
-
-#include <folly/experimental/fibers/Baton.h>
+#include <folly/Baton.h>
 #include <folly/Optional.h>
+#include <folly/Random.h>
+#include <folly/Traits.h>
 #include <folly/futures/detail/Core.h>
 #include <folly/futures/Timekeeper.h>
 
+#if defined(__ANDROID__) || defined(__APPLE__)
+#define FOLLY_FUTURE_USING_FIBER 0
+#else
+#define FOLLY_FUTURE_USING_FIBER 1
+#include <folly/experimental/fibers/Baton.h>
+#endif
+
 namespace folly {
 
 class Timekeeper;
 
 namespace detail {
-  Timekeeper* getTimekeeperSingleton();
+#if FOLLY_FUTURE_USING_FIBER
+typedef folly::fibers::Baton FutureBatonType;
+#else
+typedef folly::Baton<> FutureBatonType;
+#endif
+}
+
+namespace detail {
+  std::shared_ptr<Timekeeper> getTimekeeperSingleton();
 }
 
 template <class T>
@@ -244,6 +263,7 @@ Future<T>::onError(F&& func) {
       "Return type of onError callback must be T or Future<T>");
 
   Promise<T> p;
+  p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
   auto f = p.getFuture();
   auto pm = folly::makeMoveWrapper(std::move(p));
   auto funcm = folly::makeMoveWrapper(std::move(func));
@@ -301,7 +321,7 @@ template <class T>
 template <class F>
 Future<T> Future<T>::ensure(F func) {
   MoveWrapper<F> funcw(std::move(func));
-  return this->then([funcw](Try<T>&& t) {
+  return this->then([funcw](Try<T>&& t) mutable {
     (*funcw)();
     return makeFuture(std::move(t));
   });
@@ -470,10 +490,32 @@ Future<Unit> makeFuture() {
   return makeFuture(Unit{});
 }
 
+// makeFutureWith(Future<T>()) -> Future<T>
 template <class F>
-auto makeFutureWith(F&& func)
-    -> Future<typename Unit::Lift<decltype(func())>::type> {
-  using LiftedResult = typename Unit::Lift<decltype(func())>::type;
+typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
+                        typename std::result_of<F()>::type>::type
+makeFutureWith(F&& func) {
+  using InnerType =
+      typename isFuture<typename std::result_of<F()>::type>::Inner;
+  try {
+    return func();
+  } catch (std::exception& e) {
+    return makeFuture<InnerType>(
+        exception_wrapper(std::current_exception(), e));
+  } catch (...) {
+    return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
+  }
+}
+
+// makeFutureWith(T()) -> Future<T>
+// makeFutureWith(void()) -> Future<Unit>
+template <class F>
+typename std::enable_if<
+    !(isFuture<typename std::result_of<F()>::type>::value),
+    Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
+makeFutureWith(F&& func) {
+  using LiftedResult =
+      typename Unit::Lift<typename std::result_of<F()>::type>::type;
   return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
     return func();
   }));
@@ -562,7 +604,9 @@ namespace detail {
 
 template <typename T>
 struct CollectContext {
-  struct Nothing { explicit Nothing(int n) {} };
+  struct Nothing {
+    explicit Nothing(int /* n */) {}
+  };
 
   using Result = typename std::conditional<
     std::is_void<T>::value,
@@ -819,26 +863,29 @@ Future<T> unorderedReduce(It first, It last, T initial, F func) {
   auto ctx = std::make_shared<UnorderedReduceContext>(
     std::move(initial), std::move(func), std::distance(first, last));
 
-  mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
-    folly::MoveWrapper<Try<ItT>> mt(std::move(t));
-    // Futures can be completed in any order, simultaneously.
-    // To make this non-blocking, we create a new Future chain in
-    // the order of completion to reduce the values.
-    // The spinlock just protects chaining a new Future, not actually
-    // executing the reduce, which should be really fast.
-    folly::MSLGuard lock(ctx->lock_);
-    ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
-      // Either return a ItT&& or a Try<ItT>&& depending
-      // on the type of the argument of func.
-      return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
-    });
-    if (++ctx->numThens_ == ctx->numFutures_) {
-      // After reducing the value of the last Future, fulfill the Promise
-      ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
-        ctx->promise_.setValue(std::move(t2));
+  mapSetCallback<ItT>(
+      first,
+      last,
+      [ctx](size_t /* i */, Try<ItT>&& t) {
+        folly::MoveWrapper<Try<ItT>> mt(std::move(t));
+        // Futures can be completed in any order, simultaneously.
+        // To make this non-blocking, we create a new Future chain in
+        // the order of completion to reduce the values.
+        // The spinlock just protects chaining a new Future, not actually
+        // executing the reduce, which should be really fast.
+        folly::MSLGuard lock(ctx->lock_);
+        ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
+          // Either return a ItT&& or a Try<ItT>&& depending
+          // on the type of the argument of func.
+          return ctx->func_(std::move(v),
+                            mt->template get<IsTry::value, Arg&&>());
+        });
+        if (++ctx->numThens_ == ctx->numFutures_) {
+          // After reducing the value of the last Future, fulfill the Promise
+          ctx->memo_.setCallback_(
+              [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
+        }
       });
-    }
-  });
 
   return ctx->promise_.getFuture();
 }
@@ -862,8 +909,10 @@ Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
     std::atomic<bool> token {false};
   };
 
+  std::shared_ptr<Timekeeper> tks;
   if (!tk) {
-    tk = folly::detail::getTimekeeperSingleton();
+    tks = folly::detail::getTimekeeperSingleton();
+    tk = DCHECK_NOTNULL(tks.get());
   }
 
   auto ctx = std::make_shared<Context>(std::move(e));
@@ -908,19 +957,10 @@ void waitImpl(Future<T>& f) {
   // short-circuit if there's nothing to do
   if (f.isReady()) return;
 
-  folly::fibers::Baton baton;
-  f = f.then([&](Try<T> t) {
-    baton.post();
-    return makeFuture(std::move(t));
-  });
+  FutureBatonType baton;
+  f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
   baton.wait();
-
-  // There's a race here between the return here and the actual finishing of
-  // the future. f is completed, but the setup may not have finished on done
-  // after the baton has posted.
-  while (!f.isReady()) {
-    std::this_thread::yield();
-  }
+  assert(f.isReady());
 }
 
 template <class T>
@@ -928,19 +968,16 @@ void waitImpl(Future<T>& f, Duration dur) {
   // short-circuit if there's nothing to do
   if (f.isReady()) return;
 
-  auto baton = std::make_shared<folly::fibers::Baton>();
-  f = f.then([baton](Try<T> t) {
+  folly::MoveWrapper<Promise<T>> promise;
+  auto ret = promise->getFuture();
+  auto baton = std::make_shared<FutureBatonType>();
+  f.setCallback_([baton, promise](Try<T>&& t) mutable {
+    promise->setTry(std::move(t));
     baton->post();
-    return makeFuture(std::move(t));
   });
-
-  // Let's preserve the invariant that if we did not timeout (timed_wait returns
-  // true), then the returned Future is complete when it is returned to the
-  // caller. We need to wait out the race for that Future to complete.
+  f = std::move(ret);
   if (baton->timed_wait(dur)) {
-    while (!f.isReady()) {
-      std::this_thread::yield();
-    }
+    assert(f.isReady());
   }
 }
 
@@ -1082,6 +1119,31 @@ auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
   return then(x, std::forward<Callback>(fn));
 }
 
+template <class F>
+inline Future<Unit> when(bool p, F thunk) {
+  return p ? thunk().unit() : makeFuture();
+}
+
+template <class P, class F>
+Future<Unit> whileDo(P predicate, F thunk) {
+  if (predicate()) {
+    return thunk().then([=] {
+      return whileDo(predicate, thunk);
+    });
+  }
+  return makeFuture();
+}
+
+template <class F>
+Future<Unit> times(const int n, F thunk) {
+  auto count = folly::makeMoveWrapper(
+    std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
+  );
+  return folly::whileDo([=]() mutable {
+      return (*count)->fetch_add(1) < n;
+    }, thunk);
+}
+
 namespace futures {
   template <class It, class F, class ItT, class Result>
   std::vector<Future<Result>> map(It first, It last, F func) {
@@ -1093,6 +1155,197 @@ namespace futures {
   }
 }
 
+namespace futures {
+
+namespace detail {
+
+struct retrying_policy_raw_tag {};
+struct retrying_policy_fut_tag {};
+
+template <class Policy>
+struct retrying_policy_traits {
+  using ew = exception_wrapper;
+  FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
+  template <class Ret>
+  using has_op = typename std::integral_constant<bool,
+        has_op_call<Policy, Ret(size_t, const ew&)>::value ||
+        has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
+  using is_raw = has_op<bool>;
+  using is_fut = has_op<Future<bool>>;
+  using tag = typename std::conditional<
+        is_raw::value, retrying_policy_raw_tag, typename std::conditional<
+        is_fut::value, retrying_policy_fut_tag, void>::type>::type;
+};
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(size_t k, Policy&& p, FF&& ff) {
+  using F = typename std::result_of<FF(size_t)>::type;
+  using T = typename F::value_type;
+  auto f = ff(k++);
+  auto pm = makeMoveWrapper(p);
+  auto ffm = makeMoveWrapper(ff);
+  return f.onError([=](exception_wrapper x) mutable {
+      auto q = (*pm)(k, x);
+      auto xm = makeMoveWrapper(std::move(x));
+      return q.then([=](bool r) mutable {
+          return r
+            ? retrying(k, pm.move(), ffm.move())
+            : makeFuture<T>(xm.move());
+      });
+  });
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
+  auto pm = makeMoveWrapper(std::move(p));
+  auto q = [=](size_t k, exception_wrapper x) {
+    return makeFuture<bool>((*pm)(k, x));
+  };
+  return retrying(0, std::move(q), std::forward<FF>(ff));
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
+  return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
+}
+
+//  jittered exponential backoff, clamped to [backoff_min, backoff_max]
+template <class URNG>
+Duration retryingJitteredExponentialBackoffDur(
+    size_t n,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG& rng) {
+  using d = Duration;
+  auto dist = std::normal_distribution<double>(0.0, jitter_param);
+  auto jitter = std::exp(dist(rng));
+  auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
+  return std::max(backoff_min, std::min(backoff_max, backoff));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG rng,
+    Policy&& p) {
+  auto pm = makeMoveWrapper(std::move(p));
+  auto rngp = std::make_shared<URNG>(std::move(rng));
+  return [=](size_t n, const exception_wrapper& ex) mutable {
+    if (n == max_tries) { return makeFuture(false); }
+    return (*pm)(n, ex).then([=](bool v) {
+        if (!v) { return makeFuture(false); }
+        auto backoff = detail::retryingJitteredExponentialBackoffDur(
+            n, backoff_min, backoff_max, jitter_param, *rngp);
+        return futures::sleep(backoff).then([] { return true; });
+    });
+  };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG rng,
+    Policy&& p,
+    retrying_policy_raw_tag) {
+  auto pm = makeMoveWrapper(std::move(p));
+  auto q = [=](size_t n, const exception_wrapper& e) {
+    return makeFuture((*pm)(n, e));
+  };
+  return retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      std::move(rng),
+      std::move(q));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG rng,
+    Policy&& p,
+    retrying_policy_fut_tag) {
+  return retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      std::move(rng),
+      std::move(p));
+}
+
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff) {
+  using tag = typename detail::retrying_policy_traits<Policy>::tag;
+  return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
+}
+
+inline
+std::function<bool(size_t, const exception_wrapper&)>
+retryingPolicyBasic(
+    size_t max_tries) {
+  return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param,
+    URNG rng,
+    Policy&& p) {
+  using tag = typename detail::retrying_policy_traits<Policy>::tag;
+  return detail::retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      std::move(rng),
+      std::move(p),
+      tag());
+}
+
+inline
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+    size_t max_tries,
+    Duration backoff_min,
+    Duration backoff_max,
+    double jitter_param) {
+  auto p = [](size_t, const exception_wrapper&) { return true; };
+  return retryingPolicyCappedJitteredExponentialBackoff(
+      max_tries,
+      backoff_min,
+      backoff_max,
+      jitter_param,
+      ThreadLocalPRNG(),
+      std::move(p));
+}
+
+}
+
 // Instantiate the most common Future types to save compile time
 extern template class Future<Unit>;
 extern template class Future<bool>;