/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
}
namespace detail {
- std::shared_ptr<Timekeeper> getTimekeeperSingleton();
+std::shared_ptr<Timekeeper> getTimekeeperSingleton();
+
+// Guarantees that the stored functor is destructed before the stored promise
+// may be fulfilled. Assumes the stored functor to be noexcept-destructible.
+template <typename T, typename F>
+class CoreCallbackState {
+ public:
+ template <typename FF>
+ CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
+ noexcept(F(std::declval<FF>())))
+ : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
+ assert(before_barrier());
+ }
+
+ CoreCallbackState(CoreCallbackState&& that) noexcept(
+ noexcept(F(std::declval<F>()))) {
+ if (that.before_barrier()) {
+ new (&func_) F(std::move(that.func_));
+ promise_ = that.stealPromise();
+ }
+ }
+
+ CoreCallbackState& operator=(CoreCallbackState&&) = delete;
+
+ ~CoreCallbackState() {
+ if (before_barrier()) {
+ stealPromise();
+ }
+ }
+
+ template <typename... Args>
+ auto invoke(Args&&... args) noexcept(
+ noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
+ assert(before_barrier());
+ return std::move(func_)(std::forward<Args>(args)...);
+ }
+
+ template <typename... Args>
+ auto tryInvoke(Args&&... args) noexcept {
+ return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
+ }
+
+ void setTry(Try<T>&& t) {
+ stealPromise().setTry(std::move(t));
+ }
+
+ void setException(exception_wrapper&& ew) {
+ stealPromise().setException(std::move(ew));
+ }
+
+ Promise<T> stealPromise() noexcept {
+ assert(before_barrier());
+ func_.~F();
+ return std::move(promise_);
+ }
+
+ private:
+ bool before_barrier() const noexcept {
+ return !promise_.isFulfilled();
+ }
+
+ union {
+ F func_;
+ };
+ Promise<T> promise_{detail::EmptyConstruct{}};
+};
+
+template <typename T, typename F>
+inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
+ noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
+ std::declval<Promise<T>&&>(),
+ std::declval<F&&>()))) {
+ return CoreCallbackState<T, _t<std::decay<F>>>(
+ std::move(p), std::forward<F>(f));
+}
}
template <class T>
return *this;
}
-template <class T>
-template <typename U, typename>
-Future<T>::Future(Future<U>&& other) noexcept
- : core_(detail::Core<T>::convert(other.core_)) {
- other.core_ = nullptr;
-}
-
-template <class T>
-template <typename U, typename>
-Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
- std::swap(core_, detail::Core<T>::convert(other.core_));
- return *this;
-}
-
template <class T>
template <class T2, typename>
Future<T>::Future(T2&& val)
in some circumstances, but I think it should be explicit not implicit
in the destruction of the Future used to create it.
*/
- setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
- Try<T> && t) mutable {
- if (!isTry && t.hasException()) {
- pm.setException(std::move(t.exception()));
- } else {
- pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
- }
- });
+ setCallback_(
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (!isTry && t.hasException()) {
+ state.setException(std::move(t.exception()));
+ } else {
+ state.setTry(makeTryWith(
+ [&] { return state.invoke(t.template get<isTry, Args>()...); }));
+ }
+ });
return f;
}
auto f = p.getFuture();
f.core_->setExecutorNoLock(getExecutor());
- setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
- Try<T> && t) mutable {
- if (!isTry && t.hasException()) {
- pm.setException(std::move(t.exception()));
- } else {
- try {
- auto f2 = funcm(t.template get<isTry, Args>()...);
- // that didn't throw, now we can steal p
- f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
- p.setTry(std::move(b));
- });
- } catch (const std::exception& e) {
- pm.setException(exception_wrapper(std::current_exception(), e));
- } catch (...) {
- pm.setException(exception_wrapper(std::current_exception()));
- }
- }
- });
+ setCallback_(
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (!isTry && t.hasException()) {
+ state.setException(std::move(t.exception()));
+ } else {
+ auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
+ p.setTry(std::move(b));
+ });
+ }
+ }
+ });
return f;
}
!detail::Extract<F>::ReturnsFuture::value,
Future<T>>::type
Future<T>::onError(F&& func) {
- typedef typename detail::Extract<F>::FirstArg Exn;
+ typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
static_assert(
std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
"Return type of onError callback must be T or Future<T>");
p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
auto f = p.getFuture();
- setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
- Try<T> && t) mutable {
- if (!t.template withException<Exn>(
- [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
- pm.setTry(std::move(t));
- }
- });
+ setCallback_(
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (auto e = t.template tryGetExceptionObject<Exn>()) {
+ state.setTry(makeTryWith([&] { return state.invoke(*e); }));
+ } else {
+ state.setTry(std::move(t));
+ }
+ });
return f;
}
static_assert(
std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
"Return type of onError callback must be T or Future<T>");
- typedef typename detail::Extract<F>::FirstArg Exn;
+ typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
Promise<T> p;
auto f = p.getFuture();
- setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
- Try<T> && t) mutable {
- if (!t.template withException<Exn>([&](Exn& e) {
- try {
- auto f2 = funcm(e);
- f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
- pm.setTry(std::move(t2));
+ setCallback_(
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (auto e = t.template tryGetExceptionObject<Exn>()) {
+ auto tf2 = state.tryInvoke(*e);
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
+ p.setTry(std::move(t3));
});
- } catch (const std::exception& e2) {
- pm.setException(exception_wrapper(std::current_exception(), e2));
- } catch (...) {
- pm.setException(exception_wrapper(std::current_exception()));
}
- })) {
- pm.setTry(std::move(t));
- }
- });
+ } else {
+ state.setTry(std::move(t));
+ }
+ });
return f;
}
template <class F>
Future<T> Future<T>::ensure(F&& func) {
return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
- funcw();
+ std::move(funcw)();
return makeFuture(std::move(t));
});
}
template <class F>
Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
return within(dur, tk).onError([funcw = std::forward<F>(func)](
- TimedOut const&) { return funcw(); });
+ TimedOut const&) { return std::move(funcw)(); });
}
template <class T>
Promise<T> p;
auto f = p.getFuture();
setCallback_(
- [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> t) mutable {
if (t.hasException()) {
- try {
- auto f2 = funcm(std::move(t.exception()));
- f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
- pm.setTry(std::move(t2));
+ auto tf2 = state.tryInvoke(std::move(t.exception()));
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
+ p.setTry(std::move(t3));
});
- } catch (const std::exception& e2) {
- pm.setException(exception_wrapper(std::current_exception(), e2));
- } catch (...) {
- pm.setException(exception_wrapper(std::current_exception()));
}
} else {
- pm.setTry(std::move(t));
+ state.setTry(std::move(t));
}
});
Promise<T> p;
auto f = p.getFuture();
setCallback_(
- [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
if (t.hasException()) {
- pm.setWith([&] { return funcm(std::move(t.exception())); });
+ state.setTry(makeTryWith(
+ [&] { return state.invoke(std::move(t.exception())); }));
} else {
- pm.setTry(std::move(t));
+ state.setTry(std::move(t));
}
});
return core_->getTry();
}
+template <class T>
+Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
+ return waitVia(e).getTry();
+}
+
template <class T>
Optional<Try<T>> Future<T>::poll() {
Optional<Try<T>> o;
template <class Func>
auto via(Executor* x, Func&& func)
- -> Future<typename isFuture<decltype(func())>::Inner>
-{
+ -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
// TODO make this actually more performant. :-P #7260175
return via(x).then(std::forward<Func>(func));
}
using InnerType =
typename isFuture<typename std::result_of<F()>::type>::Inner;
try {
- return func();
+ return std::forward<F>(func)();
} catch (std::exception& e) {
return makeFuture<InnerType>(
exception_wrapper(std::current_exception(), e));
makeFutureWith(F&& func) {
using LiftedResult =
typename Unit::Lift<typename std::result_of<F()>::type>::type;
- return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
- return func();
- }));
+ return makeFuture<LiftedResult>(
+ makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
}
template <class T>
auto ctx = std::make_shared<detail::CollectAllVariadicContext<
typename std::decay<Fs>::type::value_type...>>();
detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
- ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+ ctx, std::forward<Fs>(fs)...);
return ctx->p.getFuture();
}
typename std::iterator_traits<InputIterator>::value_type::value_type T;
struct CollectAllContext {
- CollectAllContext(int n) : results(n) {}
+ CollectAllContext(size_t n) : results(n) {}
~CollectAllContext() {
p.setValue(std::move(results));
}
std::vector<Try<T>> results;
};
- auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
+ auto ctx =
+ std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
ctx->results[i] = std::move(t);
});
Nothing,
std::vector<Optional<T>>>::type;
- explicit CollectContext(int n) : result(n) {}
+ explicit CollectContext(size_t n) : result(n) {}
~CollectContext() {
if (!threw.exchange(true)) {
// map Optional<T> -> T
auto ctx = std::make_shared<detail::CollectVariadicContext<
typename std::decay<Fs>::type::value_type...>>();
detail::collectVariadicHelper<detail::CollectVariadicContext>(
- ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+ ctx, std::forward<Fs>(fs)...);
return ctx->p.getFuture();
}
typename std::iterator_traits<InputIterator>::value_type::value_type T;
struct CollectAnyContext {
- CollectAnyContext() {};
+ CollectAnyContext() {}
Promise<std::pair<size_t, Try<T>>> p;
std::atomic<bool> done {false};
};
typename std::iterator_traits<InputIterator>::value_type::value_type T;
struct CollectAnyWithoutExceptionContext {
- CollectAnyWithoutExceptionContext(){};
+ CollectAnyWithoutExceptionContext(){}
Promise<std::pair<size_t, T>> p;
std::atomic<bool> done{false};
std::atomic<size_t> nFulfilled{0};
};
auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
- ctx->nTotal = std::distance(first, last);
+ ctx->nTotal = size_t(std::distance(first, last));
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
if (!t.hasException() && !ctx->done.exchange(true)) {
UnorderedReduceContext(T&& memo, F&& fn, size_t n)
: lock_(), memo_(makeFuture<T>(std::move(memo))),
func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
- {};
+ {}
folly::MicroSpinLock lock_; // protects memo_ and numThens_
Future<T> memo_;
F func_;
template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
return collectAll(*this, futures::sleep(dur, tk))
- .then([](std::tuple<Try<T>, Try<Unit>> tup) {
- Try<T>& t = std::get<0>(tup);
- return makeFuture<T>(std::move(t));
- })
- .via(getExecutor());
+ .then([](std::tuple<Try<T>, Try<Unit>> tup) {
+ Try<T>& t = std::get<0>(tup);
+ return makeFuture<T>(std::move(t));
+ });
}
namespace detail {
template <class F>
Future<Unit> times(const int n, F&& thunk) {
return folly::whileDo(
- [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
+ [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
return count->fetch_add(1) < n;
},
std::forward<F>(thunk));
is_fut::value, retrying_policy_fut_tag, void>::type>::type;
};
+template <class Policy, class FF, class Prom>
+void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
+ using F = typename std::result_of<FF(size_t)>::type;
+ using T = typename F::value_type;
+ auto f = makeFutureWith([&] { return ff(k++); });
+ f.then([
+ k,
+ prom = std::move(prom),
+ pm = std::forward<Policy>(p),
+ ffm = std::forward<FF>(ff)
+ ](Try<T> && t) mutable {
+ if (t.hasValue()) {
+ prom.setValue(std::move(t).value());
+ return;
+ }
+ auto& x = t.exception();
+ auto q = pm(k, x);
+ q.then([
+ k,
+ prom = std::move(prom),
+ xm = std::move(x),
+ pm = std::move(pm),
+ ffm = std::move(ffm)
+ ](bool shouldRetry) mutable {
+ if (shouldRetry) {
+ retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
+ } else {
+ prom.setException(std::move(xm));
+ };
+ });
+ });
+}
+
template <class Policy, class FF>
typename std::result_of<FF(size_t)>::type
retrying(size_t k, Policy&& p, FF&& ff) {
using F = typename std::result_of<FF(size_t)>::type;
using T = typename F::value_type;
- auto f = ff(k++);
- return f.onError(
- [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
- exception_wrapper x) mutable {
- auto q = pm(k, x);
- return q.then(
- [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
- bool r) mutable {
- return r ? retrying(k, std::move(pm), std::move(ffm))
- : makeFuture<T>(std::move(xm));
- });
- });
+ auto prom = Promise<T>();
+ auto f = prom.getFuture();
+ retryingImpl(
+ k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
+ return f;
}
template <class Policy, class FF>