#pragma once
+#include <algorithm>
+#include <cassert>
#include <chrono>
+#include <random>
#include <thread>
-
-#include <folly/experimental/fibers/Baton.h>
+#include <folly/Baton.h>
#include <folly/Optional.h>
+#include <folly/Random.h>
+#include <folly/Traits.h>
#include <folly/futures/detail/Core.h>
#include <folly/futures/Timekeeper.h>
+#if defined(__ANDROID__) || defined(__APPLE__)
+#define FOLLY_FUTURE_USING_FIBER 0
+#else
+#define FOLLY_FUTURE_USING_FIBER 1
+#include <folly/experimental/fibers/Baton.h>
+#endif
+
namespace folly {
class Timekeeper;
namespace detail {
- Timekeeper* getTimekeeperSingleton();
+#if FOLLY_FUTURE_USING_FIBER
+typedef folly::fibers::Baton FutureBatonType;
+#else
+typedef folly::Baton<> FutureBatonType;
+#endif
+}
+
+namespace detail {
+ std::shared_ptr<Timekeeper> getTimekeeperSingleton();
}
template <class T>
: core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
template <class T>
-template <class T2,
- typename std::enable_if<
- folly::is_void_or_unit<T2>::value,
- int>::type>
+template <typename, typename>
Future<T>::Future()
- : core_(new detail::Core<T>(Try<T>())) {}
-
+ : core_(new detail::Core<T>(Try<T>(T()))) {}
template <class T>
Future<T>::~Future() {
// wrap these so we can move them into the lambda
folly::MoveWrapper<Promise<B>> p;
- p->setInterruptHandler(core_->getInterruptHandler());
+ p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
folly::MoveWrapper<F> funcm(std::forward<F>(func));
// grab the Future now before we lose our handle on the Promise
auto f = p->getFuture();
- if (getExecutor()) {
- f.setExecutor(getExecutor());
- }
+ f.core_->setExecutorNoLock(getExecutor());
/* This is a bit tricky.
// wrap these so we can move them into the lambda
folly::MoveWrapper<Promise<B>> p;
+ p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
folly::MoveWrapper<F> funcm(std::forward<F>(func));
// grab the Future now before we lose our handle on the Promise
auto f = p->getFuture();
- if (getExecutor()) {
- f.setExecutor(getExecutor());
- }
+ f.core_->setExecutorNoLock(getExecutor());
setCallback_(
[p, funcm](Try<T>&& t) mutable {
}
template <class T>
-Future<void> Future<T>::then() {
- return then([] (Try<T>&& t) {});
+Future<Unit> Future<T>::then() {
+ return then([] () {});
}
// onError where the callback returns T
"Return type of onError callback must be T or Future<T>");
Promise<T> p;
+ p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
auto f = p.getFuture();
auto pm = folly::makeMoveWrapper(std::move(p));
auto funcm = folly::makeMoveWrapper(std::move(func));
template <class F>
Future<T> Future<T>::ensure(F func) {
MoveWrapper<F> funcw(std::move(func));
- return this->then([funcw](Try<T>&& t) {
+ return this->then([funcw](Try<T>&& t) mutable {
(*funcw)();
return makeFuture(std::move(t));
});
template <class Func>
auto via(Executor* x, Func func)
-> Future<typename isFuture<decltype(func())>::Inner>
-// this would work, if not for Future<void> :-/
-// -> decltype(via(x).then(func))
{
// TODO make this actually more performant. :-P #7260175
return via(x).then(func);
return core_->ready();
}
+template <class T>
+bool Future<T>::hasValue() {
+ return getTry().hasValue();
+}
+
+template <class T>
+bool Future<T>::hasException() {
+ return getTry().hasException();
+}
+
template <class T>
void Future<T>::raise(exception_wrapper exception) {
core_->raise(std::move(exception));
}
inline // for multiple translation units
-Future<void> makeFuture() {
- return makeFuture(Try<void>());
+Future<Unit> makeFuture() {
+ return makeFuture(Unit{});
}
+// makeFutureWith(Future<T>()) -> Future<T>
template <class F>
-auto makeFutureWith(
- F&& func,
- typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
- -> Future<decltype(func())> {
- return makeFuture(makeTryWith([&func]() {
- return (func)();
- }));
+typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
+ typename std::result_of<F()>::type>::type
+makeFutureWith(F&& func) {
+ using InnerType =
+ typename isFuture<typename std::result_of<F()>::type>::Inner;
+ try {
+ return func();
+ } catch (std::exception& e) {
+ return makeFuture<InnerType>(
+ exception_wrapper(std::current_exception(), e));
+ } catch (...) {
+ return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
+ }
}
+// makeFutureWith(T()) -> Future<T>
+// makeFutureWith(void()) -> Future<Unit>
template <class F>
-auto makeFutureWith(F const& func) -> Future<decltype(func())> {
- F copy = func;
- return makeFuture(makeTryWith(std::move(copy)));
+typename std::enable_if<
+ !(isFuture<typename std::result_of<F()>::type>::value),
+ Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
+makeFutureWith(F&& func) {
+ using LiftedResult =
+ typename Unit::Lift<typename std::result_of<F()>::type>::type;
+ return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
+ return func();
+ }));
}
template <class T>
}
// via
-Future<void> via(Executor* executor, int8_t priority) {
+Future<Unit> via(Executor* executor, int8_t priority) {
return makeFuture().via(executor, priority);
}
template <typename T>
struct CollectContext {
- struct Nothing { explicit Nothing(int n) {} };
+ struct Nothing {
+ explicit Nothing(int /* n */) {}
+ };
using Result = typename std::conditional<
std::is_void<T>::value,
}
Promise<Result> p;
InternalResult result;
- std::atomic<bool> threw;
+ std::atomic<bool> threw {false};
};
-// Specialize for void (implementations in Future.cpp)
-
-template <>
-CollectContext<void>::~CollectContext();
-
-template <>
-void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
-
}
template <class InputIterator>
typename std::iterator_traits<InputIterator>::value_type::value_type T;
struct CollectAnyContext {
- CollectAnyContext(size_t n) : done(false) {};
+ CollectAnyContext() {};
Promise<std::pair<size_t, Try<T>>> p;
- std::atomic<bool> done;
+ std::atomic<bool> done {false};
};
- auto ctx = std::make_shared<CollectAnyContext>(std::distance(first, last));
+ auto ctx = std::make_shared<CollectAnyContext>();
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
if (!ctx->done.exchange(true)) {
ctx->p.setValue(std::make_pair(i, std::move(t)));
auto c = ++ctx->completed;
if (c <= n) {
assert(ctx->v.size() < n);
- ctx->v.push_back(std::make_pair(i, std::move(t)));
+ ctx->v.emplace_back(i, std::move(t));
if (c == n) {
ctx->p.setTry(Try<V>(std::move(ctx->v)));
}
window(Collection input, F func, size_t n) {
struct WindowContext {
WindowContext(Collection&& i, F&& fn)
- : i_(0), input_(std::move(i)), promises_(input_.size()),
+ : input_(std::move(i)), promises_(input_.size()),
func_(std::move(fn))
{}
- std::atomic<size_t> i_;
+ std::atomic<size_t> i_ {0};
Collection input_;
std::vector<Promise<Result>> promises_;
F func_;
auto ctx = std::make_shared<UnorderedReduceContext>(
std::move(initial), std::move(func), std::distance(first, last));
- mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
- folly::MoveWrapper<Try<ItT>> mt(std::move(t));
- // Futures can be completed in any order, simultaneously.
- // To make this non-blocking, we create a new Future chain in
- // the order of completion to reduce the values.
- // The spinlock just protects chaining a new Future, not actually
- // executing the reduce, which should be really fast.
- folly::MSLGuard lock(ctx->lock_);
- ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
- // Either return a ItT&& or a Try<ItT>&& depending
- // on the type of the argument of func.
- return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
- });
- if (++ctx->numThens_ == ctx->numFutures_) {
- // After reducing the value of the last Future, fulfill the Promise
- ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
- ctx->promise_.setValue(std::move(t2));
+ mapSetCallback<ItT>(
+ first,
+ last,
+ [ctx](size_t /* i */, Try<ItT>&& t) {
+ folly::MoveWrapper<Try<ItT>> mt(std::move(t));
+ // Futures can be completed in any order, simultaneously.
+ // To make this non-blocking, we create a new Future chain in
+ // the order of completion to reduce the values.
+ // The spinlock just protects chaining a new Future, not actually
+ // executing the reduce, which should be really fast.
+ folly::MSLGuard lock(ctx->lock_);
+ ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
+ // Either return a ItT&& or a Try<ItT>&& depending
+ // on the type of the argument of func.
+ return ctx->func_(std::move(v),
+ mt->template get<IsTry::value, Arg&&>());
+ });
+ if (++ctx->numThens_ == ctx->numFutures_) {
+ // After reducing the value of the last Future, fulfill the Promise
+ ctx->memo_.setCallback_(
+ [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
+ }
});
- }
- });
return ctx->promise_.getFuture();
}
Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
struct Context {
- Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
+ Context(E ex) : exception(std::move(ex)), promise() {}
E exception;
+ Future<Unit> thisFuture;
Promise<T> promise;
- std::atomic<bool> token;
+ std::atomic<bool> token {false};
};
- auto ctx = std::make_shared<Context>(std::move(e));
+ std::shared_ptr<Timekeeper> tks;
if (!tk) {
- tk = folly::detail::getTimekeeperSingleton();
+ tks = folly::detail::getTimekeeperSingleton();
+ tk = DCHECK_NOTNULL(tks.get());
}
- tk->after(dur)
- .then([ctx](Try<void> const& t) {
- if (ctx->token.exchange(true) == false) {
- if (t.hasException()) {
- ctx->promise.setException(std::move(t.exception()));
- } else {
- ctx->promise.setException(std::move(ctx->exception));
- }
- }
- });
+ auto ctx = std::make_shared<Context>(std::move(e));
- this->then([ctx](Try<T>&& t) {
+ ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
+ // TODO: "this" completed first, cancel "after"
if (ctx->token.exchange(true) == false) {
ctx->promise.setTry(std::move(t));
}
});
+ tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
+ // "after" completed first, cancel "this"
+ ctx->thisFuture.raise(TimedOut());
+ if (ctx->token.exchange(true) == false) {
+ if (t.hasException()) {
+ ctx->promise.setException(std::move(t.exception()));
+ } else {
+ ctx->promise.setException(std::move(ctx->exception));
+ }
+ }
+ });
+
return ctx->promise.getFuture().via(getExecutor());
}
template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
return collectAll(*this, futures::sleep(dur, tk))
- .then([](std::tuple<Try<T>, Try<void>> tup) {
+ .then([](std::tuple<Try<T>, Try<Unit>> tup) {
Try<T>& t = std::get<0>(tup);
return makeFuture<T>(std::move(t));
});
// short-circuit if there's nothing to do
if (f.isReady()) return;
- folly::fibers::Baton baton;
- f = f.then([&](Try<T> t) {
- baton.post();
- return makeFuture(std::move(t));
- });
+ FutureBatonType baton;
+ f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
baton.wait();
-
- // There's a race here between the return here and the actual finishing of
- // the future. f is completed, but the setup may not have finished on done
- // after the baton has posted.
- while (!f.isReady()) {
- std::this_thread::yield();
- }
+ assert(f.isReady());
}
template <class T>
// short-circuit if there's nothing to do
if (f.isReady()) return;
- auto baton = std::make_shared<folly::fibers::Baton>();
- f = f.then([baton](Try<T> t) {
+ folly::MoveWrapper<Promise<T>> promise;
+ auto ret = promise->getFuture();
+ auto baton = std::make_shared<FutureBatonType>();
+ f.setCallback_([baton, promise](Try<T>&& t) mutable {
+ promise->setTry(std::move(t));
baton->post();
- return makeFuture(std::move(t));
});
-
- // Let's preserve the invariant that if we did not timeout (timed_wait returns
- // true), then the returned Future is complete when it is returned to the
- // caller. We need to wait out the race for that Future to complete.
+ f = std::move(ret);
if (baton->timed_wait(dur)) {
- while (!f.isReady()) {
- std::this_thread::yield();
- }
+ assert(f.isReady());
}
}
return std::move(wait().value());
}
-template <>
-inline void Future<void>::get() {
- wait().value();
-}
-
template <class T>
T Future<T>::get(Duration dur) {
wait(dur);
}
}
-template <>
-inline void Future<void>::get(Duration dur) {
- wait(dur);
- if (isReady()) {
- return;
- } else {
- throw TimedOut();
- }
-}
-
template <class T>
T Future<T>::getVia(DrivableExecutor* e) {
return std::move(waitVia(e).value());
}
-template <>
-inline void Future<void>::getVia(DrivableExecutor* e) {
- waitVia(e).value();
-}
-
namespace detail {
template <class T>
struct TryEquals {
return t1.value() == t2.value();
}
};
-
- template <>
- struct TryEquals<void> {
- static bool equals(const Try<void>& t1, const Try<void>& t2) {
- return true;
- }
- };
}
template <class T>
return then(x, std::forward<Callback>(fn));
}
+template <class F>
+inline Future<Unit> when(bool p, F thunk) {
+ return p ? thunk().unit() : makeFuture();
+}
+
+template <class P, class F>
+Future<Unit> whileDo(P predicate, F thunk) {
+ if (predicate()) {
+ return thunk().then([=] {
+ return whileDo(predicate, thunk);
+ });
+ }
+ return makeFuture();
+}
+
+template <class F>
+Future<Unit> times(const int n, F thunk) {
+ auto count = folly::makeMoveWrapper(
+ std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
+ );
+ return folly::whileDo([=]() mutable {
+ return (*count)->fetch_add(1) < n;
+ }, thunk);
+}
+
namespace futures {
template <class It, class F, class ItT, class Result>
std::vector<Future<Result>> map(It first, It last, F func) {
}
}
+namespace futures {
+
+namespace detail {
+
+struct retrying_policy_raw_tag {};
+struct retrying_policy_fut_tag {};
+
+template <class Policy>
+struct retrying_policy_traits {
+ using ew = exception_wrapper;
+ FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
+ template <class Ret>
+ using has_op = typename std::integral_constant<bool,
+ has_op_call<Policy, Ret(size_t, const ew&)>::value ||
+ has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
+ using is_raw = has_op<bool>;
+ using is_fut = has_op<Future<bool>>;
+ using tag = typename std::conditional<
+ is_raw::value, retrying_policy_raw_tag, typename std::conditional<
+ is_fut::value, retrying_policy_fut_tag, void>::type>::type;
+};
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(size_t k, Policy&& p, FF&& ff) {
+ using F = typename std::result_of<FF(size_t)>::type;
+ using T = typename F::value_type;
+ auto f = ff(k++);
+ auto pm = makeMoveWrapper(p);
+ auto ffm = makeMoveWrapper(ff);
+ return f.onError([=](exception_wrapper x) mutable {
+ auto q = (*pm)(k, x);
+ auto xm = makeMoveWrapper(std::move(x));
+ return q.then([=](bool r) mutable {
+ return r
+ ? retrying(k, pm.move(), ffm.move())
+ : makeFuture<T>(xm.move());
+ });
+ });
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
+ auto pm = makeMoveWrapper(std::move(p));
+ auto q = [=](size_t k, exception_wrapper x) {
+ return makeFuture<bool>((*pm)(k, x));
+ };
+ return retrying(0, std::move(q), std::forward<FF>(ff));
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
+ return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
+}
+
+// jittered exponential backoff, clamped to [backoff_min, backoff_max]
+template <class URNG>
+Duration retryingJitteredExponentialBackoffDur(
+ size_t n,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG& rng) {
+ using d = Duration;
+ auto dist = std::normal_distribution<double>(0.0, jitter_param);
+ auto jitter = std::exp(dist(rng));
+ auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
+ return std::max(backoff_min, std::min(backoff_max, backoff));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p) {
+ auto pm = makeMoveWrapper(std::move(p));
+ auto rngp = std::make_shared<URNG>(std::move(rng));
+ return [=](size_t n, const exception_wrapper& ex) mutable {
+ if (n == max_tries) { return makeFuture(false); }
+ return (*pm)(n, ex).then([=](bool v) {
+ if (!v) { return makeFuture(false); }
+ auto backoff = detail::retryingJitteredExponentialBackoffDur(
+ n, backoff_min, backoff_max, jitter_param, *rngp);
+ return futures::sleep(backoff).then([] { return true; });
+ });
+ };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p,
+ retrying_policy_raw_tag) {
+ auto pm = makeMoveWrapper(std::move(p));
+ auto q = [=](size_t n, const exception_wrapper& e) {
+ return makeFuture((*pm)(n, e));
+ };
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::move(rng),
+ std::move(q));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p,
+ retrying_policy_fut_tag) {
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::move(rng),
+ std::move(p));
+}
+
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff) {
+ using tag = typename detail::retrying_policy_traits<Policy>::tag;
+ return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
+}
+
+inline
+std::function<bool(size_t, const exception_wrapper&)>
+retryingPolicyBasic(
+ size_t max_tries) {
+ return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p) {
+ using tag = typename detail::retrying_policy_traits<Policy>::tag;
+ return detail::retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::move(rng),
+ std::move(p),
+ tag());
+}
+
+inline
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param) {
+ auto p = [](size_t, const exception_wrapper&) { return true; };
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ ThreadLocalPRNG(),
+ std::move(p));
+}
+
+}
+
// Instantiate the most common Future types to save compile time
-extern template class Future<void>;
+extern template class Future<Unit>;
extern template class Future<bool>;
extern template class Future<int>;
extern template class Future<int64_t>;
extern template class Future<double>;
} // namespace folly
-
-// I haven't included a Future<T&> specialization because I don't forsee us
-// using it, however it is not difficult to add when needed. Refer to
-// Future<void> for guidance. std::future and boost::future code would also be
-// instructive.