/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <chrono>
#include <random>
#include <thread>
+
#include <folly/Baton.h>
#include <folly/Optional.h>
#include <folly/Random.h>
-#include <folly/Traits.h>
-#include <folly/futures/detail/Core.h>
#include <folly/futures/Timekeeper.h>
+#include <folly/futures/detail/Core.h>
#if FOLLY_MOBILE || defined(__APPLE__)
#define FOLLY_FUTURE_USING_FIBER 0
class Timekeeper;
+namespace futures {
namespace detail {
#if FOLLY_FUTURE_USING_FIBER
typedef folly::fibers::Baton FutureBatonType;
#else
typedef folly::Baton<> FutureBatonType;
#endif
-}
+} // namespace detail
+} // namespace futures
+
+namespace detail {
+std::shared_ptr<Timekeeper> getTimekeeperSingleton();
+} // namespace detail
+namespace futures {
namespace detail {
- std::shared_ptr<Timekeeper> getTimekeeperSingleton();
+// Guarantees that the stored functor is destructed before the stored promise
+// may be fulfilled. Assumes the stored functor to be noexcept-destructible.
+template <typename T, typename F>
+class CoreCallbackState {
+ public:
+ template <typename FF>
+ CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
+ noexcept(F(std::declval<FF>())))
+ : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
+ assert(before_barrier());
+ }
+
+ CoreCallbackState(CoreCallbackState&& that) noexcept(
+ noexcept(F(std::declval<F>()))) {
+ if (that.before_barrier()) {
+ new (&func_) F(std::move(that.func_));
+ promise_ = that.stealPromise();
+ }
+ }
+
+ CoreCallbackState& operator=(CoreCallbackState&&) = delete;
+
+ ~CoreCallbackState() {
+ if (before_barrier()) {
+ stealPromise();
+ }
+ }
+
+ template <typename... Args>
+ auto invoke(Args&&... args) noexcept(
+ noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
+ assert(before_barrier());
+ return std::move(func_)(std::forward<Args>(args)...);
+ }
+
+ template <typename... Args>
+ auto tryInvoke(Args&&... args) noexcept {
+ return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
+ }
+
+ void setTry(Try<T>&& t) {
+ stealPromise().setTry(std::move(t));
+ }
+
+ void setException(exception_wrapper&& ew) {
+ stealPromise().setException(std::move(ew));
+ }
+
+ Promise<T> stealPromise() noexcept {
+ assert(before_barrier());
+ func_.~F();
+ return std::move(promise_);
+ }
+
+ private:
+ bool before_barrier() const noexcept {
+ return !promise_.isFulfilled();
+ }
+
+ union {
+ F func_;
+ };
+ Promise<T> promise_{Promise<T>::makeEmpty()};
+};
+
+template <typename T, typename F>
+inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
+ noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
+ std::declval<Promise<T>&&>(),
+ std::declval<F&&>()))) {
+ return CoreCallbackState<T, _t<std::decay<F>>>(
+ std::move(p), std::forward<F>(f));
+}
+} // namespace detail
+} // namespace futures
+
+template <class T>
+Future<T> Future<T>::makeEmpty() {
+ return Future<T>(futures::detail::EmptyConstruct{});
}
template <class T>
}
template <class T>
-template <typename U, typename>
-Future<T>::Future(Future<U>&& other) noexcept
- : core_(detail::Core<T>::convert(other.core_)) {
- other.core_ = nullptr;
-}
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value &&
+ std::is_convertible<T2&&, T>::value,
+ int>::type>
+Future<T>::Future(Future<T2>&& other)
+ : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
template <class T>
-template <typename U, typename>
-Future<T>& Future<T>::operator=(Future<U>&& other) noexcept {
- std::swap(core_, detail::Core<T>::convert(other.core_));
- return *this;
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value &&
+ !std::is_convertible<T2&&, T>::value,
+ int>::type>
+Future<T>::Future(Future<T2>&& other)
+ : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
+
+template <class T>
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value,
+ int>::type>
+Future<T>& Future<T>::operator=(Future<T2>&& other) {
+ return operator=(
+ std::move(other).then([](T2&& v) { return T(std::move(v)); }));
}
template <class T>
template <class T2, typename>
Future<T>::Future(T2&& val)
- : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
+ : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
template <class T>
template <typename T2>
Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
- : core_(new detail::Core<T>(Try<T>(T()))) {}
+ : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
+
+template <class T>
+template <
+ class... Args,
+ typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
+ type>
+Future<T>::Future(in_place_t, Args&&... args)
+ : core_(
+ new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
+}
template <class T>
Future<T>::~Future() {
template <class T>
void Future<T>::throwIfInvalid() const {
if (!core_)
- throw NoState();
+ throwNoState();
}
template <class T>
template <class T>
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
+Future<T>::thenImplementation(
+ F&& func,
+ futures::detail::argResult<isTry, F, Args...>) {
static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
typedef typename R::ReturnsFuture::Inner B;
persist beyond the callback, if it gets moved), and so it is an
optimization to just make it shared from the get-go.
- Two subtle but important points about this design. detail::Core has no
- back pointers to Future or Promise, so if Future or Promise get moved
- (and they will be moved in performant code) we don't have to do
+ Two subtle but important points about this design. futures::detail::Core
+ has no back pointers to Future or Promise, so if Future or Promise get
+ moved (and they will be moved in performant code) we don't have to do
anything fancy. And because we store the continuation in the
- detail::Core, not in the Future, we can execute the continuation even
- after the Future has gone out of scope. This is an intentional design
+ futures::detail::Core, not in the Future, we can execute the continuation
+ even after the Future has gone out of scope. This is an intentional design
decision. It is likely we will want to be able to cancel a continuation
in some circumstances, but I think it should be explicit not implicit
in the destruction of the Future used to create it.
*/
- setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
- Try<T> && t) mutable {
- if (!isTry && t.hasException()) {
- pm.setException(std::move(t.exception()));
- } else {
- pm.setWith([&]() { return funcm(t.template get<isTry, Args>()...); });
- }
- });
+ setCallback_(
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (!isTry && t.hasException()) {
+ state.setException(std::move(t.exception()));
+ } else {
+ state.setTry(makeTryWith(
+ [&] { return state.invoke(t.template get<isTry, Args>()...); }));
+ }
+ });
return f;
}
template <class T>
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
+Future<T>::thenImplementation(
+ F&& func,
+ futures::detail::argResult<isTry, F, Args...>) {
static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
typedef typename R::ReturnsFuture::Inner B;
auto f = p.getFuture();
f.core_->setExecutorNoLock(getExecutor());
- setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
- Try<T> && t) mutable {
- if (!isTry && t.hasException()) {
- pm.setException(std::move(t.exception()));
- } else {
- try {
- auto f2 = funcm(t.template get<isTry, Args>()...);
- // that didn't throw, now we can steal p
- f2.setCallback_([p = std::move(pm)](Try<B> && b) mutable {
- p.setTry(std::move(b));
- });
- } catch (const std::exception& e) {
- pm.setException(exception_wrapper(std::current_exception(), e));
- } catch (...) {
- pm.setException(exception_wrapper(std::current_exception()));
- }
- }
- });
+ setCallback_(
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (!isTry && t.hasException()) {
+ state.setException(std::move(t.exception()));
+ } else {
+ auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
+ p.setTry(std::move(b));
+ });
+ }
+ }
+ });
return f;
}
template <typename R, typename Caller, typename... Args>
Future<typename isFuture<R>::Inner>
Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
- typedef typename std::remove_cv<
- typename std::remove_reference<
- typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
+ typedef typename std::remove_cv<typename std::remove_reference<
+ typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
+ FirstArg;
return then([instance, func](Try<T>&& t){
return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
});
}
-template <class T>
-template <class Executor, class Arg, class... Args>
-auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
- -> decltype(this->then(std::forward<Arg>(arg),
- std::forward<Args>(args)...))
-{
- auto oldX = getExecutor();
- setExecutor(x);
- return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
- via(oldX);
-}
-
template <class T>
Future<Unit> Future<T>::then() {
return then([] () {});
template <class T>
template <class F>
typename std::enable_if<
- !detail::callableWith<F, exception_wrapper>::value &&
- !detail::Extract<F>::ReturnsFuture::value,
- Future<T>>::type
+ !futures::detail::callableWith<F, exception_wrapper>::value &&
+ !futures::detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
Future<T>::onError(F&& func) {
- typedef typename detail::Extract<F>::FirstArg Exn;
+ typedef std::remove_reference_t<
+ typename futures::detail::Extract<F>::FirstArg>
+ Exn;
static_assert(
- std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
+ std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
"Return type of onError callback must be T or Future<T>");
Promise<T> p;
p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
auto f = p.getFuture();
- setCallback_([ funcm = std::forward<F>(func), pm = std::move(p) ](
- Try<T> && t) mutable {
- if (!t.template withException<Exn>(
- [&](Exn& e) { pm.setWith([&] { return funcm(e); }); })) {
- pm.setTry(std::move(t));
- }
- });
+ setCallback_(
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (auto e = t.template tryGetExceptionObject<Exn>()) {
+ state.setTry(makeTryWith([&] { return state.invoke(*e); }));
+ } else {
+ state.setTry(std::move(t));
+ }
+ });
return f;
}
template <class T>
template <class F>
typename std::enable_if<
- !detail::callableWith<F, exception_wrapper>::value &&
- detail::Extract<F>::ReturnsFuture::value,
- Future<T>>::type
+ !futures::detail::callableWith<F, exception_wrapper>::value &&
+ futures::detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
Future<T>::onError(F&& func) {
static_assert(
- std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
+ std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
+ value,
"Return type of onError callback must be T or Future<T>");
- typedef typename detail::Extract<F>::FirstArg Exn;
+ typedef std::remove_reference_t<
+ typename futures::detail::Extract<F>::FirstArg>
+ Exn;
Promise<T> p;
auto f = p.getFuture();
- setCallback_([ pm = std::move(p), funcm = std::forward<F>(func) ](
- Try<T> && t) mutable {
- if (!t.template withException<Exn>([&](Exn& e) {
- try {
- auto f2 = funcm(e);
- f2.setCallback_([pm = std::move(pm)](Try<T> && t2) mutable {
- pm.setTry(std::move(t2));
+ setCallback_(
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (auto e = t.template tryGetExceptionObject<Exn>()) {
+ auto tf2 = state.tryInvoke(*e);
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
+ p.setTry(std::move(t3));
});
- } catch (const std::exception& e2) {
- pm.setException(exception_wrapper(std::current_exception(), e2));
- } catch (...) {
- pm.setException(exception_wrapper(std::current_exception()));
}
- })) {
- pm.setTry(std::move(t));
- }
- });
+ } else {
+ state.setTry(std::move(t));
+ }
+ });
return f;
}
template <class F>
Future<T> Future<T>::ensure(F&& func) {
return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
- funcw();
+ std::move(funcw)();
return makeFuture(std::move(t));
});
}
template <class F>
Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
return within(dur, tk).onError([funcw = std::forward<F>(func)](
- TimedOut const&) { return funcw(); });
+ TimedOut const&) { return std::move(funcw)(); });
}
template <class T>
template <class F>
-typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
- detail::Extract<F>::ReturnsFuture::value,
- Future<T>>::type
+typename std::enable_if<
+ futures::detail::callableWith<F, exception_wrapper>::value &&
+ futures::detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
Future<T>::onError(F&& func) {
static_assert(
- std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
+ std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
+ value,
"Return type of onError callback must be T or Future<T>");
Promise<T> p;
auto f = p.getFuture();
setCallback_(
- [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> t) mutable {
if (t.hasException()) {
- try {
- auto f2 = funcm(std::move(t.exception()));
- f2.setCallback_([pm = std::move(pm)](Try<T> t2) mutable {
- pm.setTry(std::move(t2));
+ auto tf2 = state.tryInvoke(std::move(t.exception()));
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
+ p.setTry(std::move(t3));
});
- } catch (const std::exception& e2) {
- pm.setException(exception_wrapper(std::current_exception(), e2));
- } catch (...) {
- pm.setException(exception_wrapper(std::current_exception()));
}
} else {
- pm.setTry(std::move(t));
+ state.setTry(std::move(t));
}
});
template <class T>
template <class F>
typename std::enable_if<
- detail::callableWith<F, exception_wrapper>::value &&
- !detail::Extract<F>::ReturnsFuture::value,
- Future<T>>::type
+ futures::detail::callableWith<F, exception_wrapper>::value &&
+ !futures::detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
Future<T>::onError(F&& func) {
static_assert(
- std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
+ std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
+ value,
"Return type of onError callback must be T or Future<T>");
Promise<T> p;
auto f = p.getFuture();
setCallback_(
- [ pm = std::move(p), funcm = std::forward<F>(func) ](Try<T> t) mutable {
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
if (t.hasException()) {
- pm.setWith([&] { return funcm(std::move(t.exception())); });
+ state.setTry(makeTryWith(
+ [&] { return state.invoke(std::move(t.exception())); }));
} else {
- pm.setTry(std::move(t));
+ state.setTry(std::move(t));
}
});
return core_->getTry();
}
+template <class T>
+Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
+ return waitVia(e).getTry();
+}
+
template <class T>
Optional<Try<T>> Future<T>::poll() {
Optional<Try<T>> o;
template <class Func>
auto via(Executor* x, Func&& func)
- -> Future<typename isFuture<decltype(func())>::Inner>
-{
+ -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
// TODO make this actually more performant. :-P #7260175
return via(x).then(std::forward<Func>(func));
}
core_->raise(std::move(exception));
}
+template <class T>
+Future<T>::Future(futures::detail::EmptyConstruct) noexcept : core_(nullptr) {}
+
// makeFuture
template <class T>
using InnerType =
typename isFuture<typename std::result_of<F()>::type>::Inner;
try {
- return func();
+ return std::forward<F>(func)();
} catch (std::exception& e) {
return makeFuture<InnerType>(
exception_wrapper(std::current_exception(), e));
makeFutureWith(F&& func) {
using LiftedResult =
typename Unit::Lift<typename std::result_of<F()>::type>::type;
- return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
- return func();
- }));
+ return makeFuture<LiftedResult>(
+ makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
}
template <class T>
template <class T>
Future<T> makeFuture(Try<T>&& t) {
- return Future<T>(new detail::Core<T>(std::move(t)));
+ return Future<T>(new futures::detail::Core<T>(std::move(t)));
}
// via
// collectAll (variadic)
template <typename... Fs>
-typename detail::CollectAllVariadicContext<
- typename std::decay<Fs>::type::value_type...>::type
+typename futures::detail::CollectAllVariadicContext<
+ typename std::decay<Fs>::type::value_type...>::type
collectAll(Fs&&... fs) {
- auto ctx = std::make_shared<detail::CollectAllVariadicContext<
- typename std::decay<Fs>::type::value_type...>>();
- detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
- ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+ auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
+ typename std::decay<Fs>::type::value_type...>>();
+ futures::detail::collectVariadicHelper<
+ futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
return ctx->p.getFuture();
}
typename std::iterator_traits<InputIterator>::value_type::value_type T;
struct CollectAllContext {
- CollectAllContext(int n) : results(n) {}
+ CollectAllContext(size_t n) : results(n) {}
~CollectAllContext() {
p.setValue(std::move(results));
}
std::vector<Try<T>> results;
};
- auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
+ auto ctx =
+ std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
ctx->results[i] = std::move(t);
});
// collect (iterator)
+namespace futures {
namespace detail {
template <typename T>
Nothing,
std::vector<Optional<T>>>::type;
- explicit CollectContext(int n) : result(n) {}
+ explicit CollectContext(size_t n) : result(n) {}
~CollectContext() {
if (!threw.exchange(true)) {
// map Optional<T> -> T
std::atomic<bool> threw {false};
};
-}
+} // namespace detail
+} // namespace futures
template <class InputIterator>
-Future<typename detail::CollectContext<
- typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
+Future<typename futures::detail::CollectContext<typename std::iterator_traits<
+ InputIterator>::value_type::value_type>::Result>
collect(InputIterator first, InputIterator last) {
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
- auto ctx = std::make_shared<detail::CollectContext<T>>(
- std::distance(first, last));
+ auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
+ std::distance(first, last));
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
if (t.hasException()) {
if (!ctx->threw.exchange(true)) {
// collect (variadic)
template <typename... Fs>
-typename detail::CollectVariadicContext<
- typename std::decay<Fs>::type::value_type...>::type
+typename futures::detail::CollectVariadicContext<
+ typename std::decay<Fs>::type::value_type...>::type
collect(Fs&&... fs) {
- auto ctx = std::make_shared<detail::CollectVariadicContext<
- typename std::decay<Fs>::type::value_type...>>();
- detail::collectVariadicHelper<detail::CollectVariadicContext>(
- ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+ auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
+ typename std::decay<Fs>::type::value_type...>>();
+ futures::detail::collectVariadicHelper<
+ futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
return ctx->p.getFuture();
}
typename std::iterator_traits<InputIterator>::value_type::value_type T;
struct CollectAnyContext {
- CollectAnyContext() {};
+ CollectAnyContext() {}
Promise<std::pair<size_t, Try<T>>> p;
std::atomic<bool> done {false};
};
typename std::iterator_traits<InputIterator>::value_type::value_type T;
struct CollectAnyWithoutExceptionContext {
- CollectAnyWithoutExceptionContext(){};
+ CollectAnyWithoutExceptionContext(){}
Promise<std::pair<size_t, T>> p;
std::atomic<bool> done{false};
std::atomic<size_t> nFulfilled{0};
};
auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
- ctx->nTotal = std::distance(first, last);
+ ctx->nTotal = size_t(std::distance(first, last));
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
if (!t.hasException() && !ctx->done.exchange(true)) {
}
typedef typename std::iterator_traits<It>::value_type::value_type ItT;
- typedef
- typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
- Try<ItT>,
- ItT>::type Arg;
+ typedef typename std::conditional<
+ futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
+ Try<ItT>,
+ ItT>::type Arg;
typedef isTry<Arg> IsTry;
auto sfunc = std::make_shared<F>(std::move(func));
UnorderedReduceContext(T&& memo, F&& fn, size_t n)
: lock_(), memo_(makeFuture<T>(std::move(memo))),
func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
- {};
+ {}
folly::MicroSpinLock lock_; // protects memo_ and numThens_
Future<T> memo_;
F func_;
});
}
+namespace futures {
namespace detail {
template <class T>
assert(f.isReady());
}
-} // detail
+} // namespace detail
+} // namespace futures
template <class T>
Future<T>& Future<T>::wait() & {
- detail::waitImpl(*this);
+ futures::detail::waitImpl(*this);
return *this;
}
template <class T>
Future<T>&& Future<T>::wait() && {
- detail::waitImpl(*this);
+ futures::detail::waitImpl(*this);
return std::move(*this);
}
template <class T>
Future<T>& Future<T>::wait(Duration dur) & {
- detail::waitImpl(*this, dur);
+ futures::detail::waitImpl(*this, dur);
return *this;
}
template <class T>
Future<T>&& Future<T>::wait(Duration dur) && {
- detail::waitImpl(*this, dur);
+ futures::detail::waitImpl(*this, dur);
return std::move(*this);
}
template <class T>
Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
- detail::waitViaImpl(*this, e);
+ futures::detail::waitViaImpl(*this, e);
return *this;
}
template <class T>
Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
- detail::waitViaImpl(*this, e);
+ futures::detail::waitViaImpl(*this, e);
return std::move(*this);
}
if (isReady()) {
return std::move(value());
} else {
- throw TimedOut();
+ throwTimedOut();
}
}
return std::move(waitVia(e).value());
}
+namespace futures {
namespace detail {
- template <class T>
- struct TryEquals {
- static bool equals(const Try<T>& t1, const Try<T>& t2) {
- return t1.value() == t2.value();
- }
- };
-}
+template <class T>
+struct TryEquals {
+ static bool equals(const Try<T>& t1, const Try<T>& t2) {
+ return t1.value() == t2.value();
+ }
+};
+} // namespace detail
+} // namespace futures
template <class T>
Future<bool> Future<T>::willEqual(Future<T>& f) {
return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
- return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
+ return futures::detail::TryEquals<T>::equals(
+ std::get<0>(t), std::get<1>(t));
} else {
return false;
}
return this->then([p = std::forward<F>(predicate)](T val) {
T const& valConstRef = val;
if (!p(valConstRef)) {
- throw PredicateDoesNotObtain();
+ throwPredicateDoesNotObtain();
}
return val;
});
}
-template <class T>
-template <class Callback>
-auto Future<T>::thenMulti(Callback&& fn)
- -> decltype(this->then(std::forward<Callback>(fn))) {
- // thenMulti with one callback is just a then
- return then(std::forward<Callback>(fn));
-}
-
-template <class T>
-template <class Callback, class... Callbacks>
-auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
- -> decltype(this->then(std::forward<Callback>(fn)).
- thenMulti(std::forward<Callbacks>(fns)...)) {
- // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
- return then(std::forward<Callback>(fn)).
- thenMulti(std::forward<Callbacks>(fns)...);
-}
-
-template <class T>
-template <class Callback, class... Callbacks>
-auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
- Callbacks&&... fns)
- -> decltype(this->then(std::forward<Callback>(fn)).
- thenMulti(std::forward<Callbacks>(fns)...)) {
- // thenMultiExecutor with two callbacks is
- // via(x).then(a).thenMulti(b, ...).via(oldX)
- auto oldX = getExecutor();
- setExecutor(x);
- return then(std::forward<Callback>(fn)).
- thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
-}
-
-template <class T>
-template <class Callback>
-auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
- -> decltype(this->then(std::forward<Callback>(fn))) {
- // thenMulti with one callback is just a then with an executor
- return then(x, std::forward<Callback>(fn));
-}
-
template <class F>
inline Future<Unit> when(bool p, F&& thunk) {
return p ? std::forward<F>(thunk)().unit() : makeFuture();
template <class F>
Future<Unit> times(const int n, F&& thunk) {
return folly::whileDo(
- [ n, count = folly::make_unique<std::atomic<int>>(0) ]() mutable {
+ [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
return count->fetch_add(1) < n;
},
std::forward<F>(thunk));
template <class Policy>
struct retrying_policy_traits {
- using ew = exception_wrapper;
- FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
- template <class Ret>
- using has_op = typename std::integral_constant<bool,
- has_op_call<Policy, Ret(size_t, const ew&)>::value ||
- has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
- using is_raw = has_op<bool>;
- using is_fut = has_op<Future<bool>>;
+ using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
+ using is_raw = std::is_same<result, bool>;
+ using is_fut = std::is_same<result, Future<bool>>;
using tag = typename std::conditional<
is_raw::value, retrying_policy_raw_tag, typename std::conditional<
is_fut::value, retrying_policy_fut_tag, void>::type>::type;
};
+template <class Policy, class FF, class Prom>
+void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
+ using F = typename std::result_of<FF(size_t)>::type;
+ using T = typename F::value_type;
+ auto f = makeFutureWith([&] { return ff(k++); });
+ f.then([
+ k,
+ prom = std::move(prom),
+ pm = std::forward<Policy>(p),
+ ffm = std::forward<FF>(ff)
+ ](Try<T> && t) mutable {
+ if (t.hasValue()) {
+ prom.setValue(std::move(t).value());
+ return;
+ }
+ auto& x = t.exception();
+ auto q = pm(k, x);
+ q.then([
+ k,
+ prom = std::move(prom),
+ xm = std::move(x),
+ pm = std::move(pm),
+ ffm = std::move(ffm)
+ ](bool shouldRetry) mutable {
+ if (shouldRetry) {
+ retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
+ } else {
+ prom.setException(std::move(xm));
+ };
+ });
+ });
+}
+
template <class Policy, class FF>
typename std::result_of<FF(size_t)>::type
retrying(size_t k, Policy&& p, FF&& ff) {
using F = typename std::result_of<FF(size_t)>::type;
using T = typename F::value_type;
- auto f = ff(k++);
- return f.onError(
- [ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
- exception_wrapper x) mutable {
- auto q = pm(k, x);
- return q.then(
- [ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
- bool r) mutable {
- return r ? retrying(k, std::move(pm), std::move(ffm))
- : makeFuture<T>(std::move(xm));
- });
- });
+ auto prom = Promise<T>();
+ auto f = prom.getFuture();
+ retryingImpl(
+ k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
+ return f;
}
template <class Policy, class FF>