/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#pragma once
+#include <algorithm>
+#include <cassert>
#include <chrono>
+#include <random>
#include <thread>
-
-#include <folly/experimental/fibers/Baton.h>
+#include <folly/Baton.h>
#include <folly/Optional.h>
+#include <folly/Random.h>
#include <folly/futures/detail/Core.h>
#include <folly/futures/Timekeeper.h>
+#if FOLLY_MOBILE || defined(__APPLE__)
+#define FOLLY_FUTURE_USING_FIBER 0
+#else
+#define FOLLY_FUTURE_USING_FIBER 1
+#include <folly/fibers/Baton.h>
+#endif
+
namespace folly {
class Timekeeper;
namespace detail {
- Timekeeper* getTimekeeperSingleton();
+#if FOLLY_FUTURE_USING_FIBER
+typedef folly::fibers::Baton FutureBatonType;
+#else
+typedef folly::Baton<> FutureBatonType;
+#endif
+}
+
+namespace detail {
+std::shared_ptr<Timekeeper> getTimekeeperSingleton();
+
+// Guarantees that the stored functor is destructed before the stored promise
+// may be fulfilled. Assumes the stored functor to be noexcept-destructible.
+template <typename T, typename F>
+class CoreCallbackState {
+ public:
+ template <typename FF>
+ CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
+ noexcept(F(std::declval<FF>())))
+ : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
+ assert(before_barrier());
+ }
+
+ CoreCallbackState(CoreCallbackState&& that) noexcept(
+ noexcept(F(std::declval<F>()))) {
+ if (that.before_barrier()) {
+ new (&func_) F(std::move(that.func_));
+ promise_ = that.stealPromise();
+ }
+ }
+
+ CoreCallbackState& operator=(CoreCallbackState&&) = delete;
+
+ ~CoreCallbackState() {
+ if (before_barrier()) {
+ stealPromise();
+ }
+ }
+
+ template <typename... Args>
+ auto invoke(Args&&... args) noexcept(
+ noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
+ assert(before_barrier());
+ return std::move(func_)(std::forward<Args>(args)...);
+ }
+
+ template <typename... Args>
+ auto tryInvoke(Args&&... args) noexcept {
+ return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
+ }
+
+ void setTry(Try<T>&& t) {
+ stealPromise().setTry(std::move(t));
+ }
+
+ void setException(exception_wrapper&& ew) {
+ stealPromise().setException(std::move(ew));
+ }
+
+ Promise<T> stealPromise() noexcept {
+ assert(before_barrier());
+ func_.~F();
+ return std::move(promise_);
+ }
+
+ private:
+ bool before_barrier() const noexcept {
+ return !promise_.isFulfilled();
+ }
+
+ union {
+ F func_;
+ };
+ Promise<T> promise_{Promise<T>::makeEmpty()};
+};
+
+template <typename T, typename F>
+inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
+ noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
+ std::declval<Promise<T>&&>(),
+ std::declval<F&&>()))) {
+ return CoreCallbackState<T, _t<std::decay<F>>>(
+ std::move(p), std::forward<F>(f));
+}
+}
+
+template <class T>
+Future<T> Future<T>::makeEmpty() {
+ return Future<T>(detail::EmptyConstruct{});
}
template <class T>
return *this;
}
+template <class T>
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value &&
+ std::is_convertible<T2&&, T>::value,
+ int>::type>
+Future<T>::Future(Future<T2>&& other)
+ : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
+
+template <class T>
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value &&
+ !std::is_convertible<T2&&, T>::value,
+ int>::type>
+Future<T>::Future(Future<T2>&& other)
+ : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
+
+template <class T>
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value,
+ int>::type>
+Future<T>& Future<T>::operator=(Future<T2>&& other) {
+ return operator=(
+ std::move(other).then([](T2&& v) { return T(std::move(v)); }));
+}
+
template <class T>
template <class T2, typename>
Future<T>::Future(T2&& val)
- : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
+ : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
template <class T>
-template <typename, typename>
-Future<T>::Future()
- : core_(new detail::Core<T>(Try<T>())) {}
+template <typename T2>
+Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
+ : core_(new detail::Core<T>(Try<T>(T()))) {}
template <class T>
Future<T>::~Future() {
template <class F>
void Future<T>::setCallback_(F&& func) {
throwIfInvalid();
- core_->setCallback(std::move(func));
+ core_->setCallback(std::forward<F>(func));
}
// unwrap
template <class T>
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
+Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
typedef typename R::ReturnsFuture::Inner B;
throwIfInvalid();
- // wrap these so we can move them into the lambda
- folly::MoveWrapper<Promise<B>> p;
- p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
- folly::MoveWrapper<F> funcm(std::forward<F>(func));
+ Promise<B> p;
+ p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
// grab the Future now before we lose our handle on the Promise
- auto f = p->getFuture();
+ auto f = p.getFuture();
f.core_->setExecutorNoLock(getExecutor());
/* This is a bit tricky.
persist beyond the callback, if it gets moved), and so it is an
optimization to just make it shared from the get-go.
- We have to move in the Promise and func using the MoveWrapper
- hack. (func could be copied but it's a big drag on perf).
-
Two subtle but important points about this design. detail::Core has no
back pointers to Future or Promise, so if Future or Promise get moved
(and they will be moved in performant code) we don't have to do
in the destruction of the Future used to create it.
*/
setCallback_(
- [p, funcm](Try<T>&& t) mutable {
- if (!isTry && t.hasException()) {
- p->setException(std::move(t.exception()));
- } else {
- p->setWith([&]() {
- return (*funcm)(t.template get<isTry, Args>()...);
- });
- }
- });
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (!isTry && t.hasException()) {
+ state.setException(std::move(t.exception()));
+ } else {
+ state.setTry(makeTryWith(
+ [&] { return state.invoke(t.template get<isTry, Args>()...); }));
+ }
+ });
return f;
}
template <class T>
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
+Future<T>::thenImplementation(F&& func, detail::argResult<isTry, F, Args...>) {
static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
typedef typename R::ReturnsFuture::Inner B;
throwIfInvalid();
- // wrap these so we can move them into the lambda
- folly::MoveWrapper<Promise<B>> p;
- p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
- folly::MoveWrapper<F> funcm(std::forward<F>(func));
+ Promise<B> p;
+ p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
// grab the Future now before we lose our handle on the Promise
- auto f = p->getFuture();
+ auto f = p.getFuture();
f.core_->setExecutorNoLock(getExecutor());
setCallback_(
- [p, funcm](Try<T>&& t) mutable {
- if (!isTry && t.hasException()) {
- p->setException(std::move(t.exception()));
- } else {
- try {
- auto f2 = (*funcm)(t.template get<isTry, Args>()...);
- // that didn't throw, now we can steal p
- f2.setCallback_([p](Try<B>&& b) mutable {
- p->setTry(std::move(b));
- });
- } catch (const std::exception& e) {
- p->setException(exception_wrapper(std::current_exception(), e));
- } catch (...) {
- p->setException(exception_wrapper(std::current_exception()));
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (!isTry && t.hasException()) {
+ state.setException(std::move(t.exception()));
+ } else {
+ auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
+ p.setTry(std::move(b));
+ });
+ }
}
- }
- });
+ });
return f;
}
}
template <class T>
-template <class Executor, class Arg, class... Args>
-auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
- -> decltype(this->then(std::forward<Arg>(arg),
- std::forward<Args>(args)...))
-{
- auto oldX = getExecutor();
- setExecutor(x);
- return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
- via(oldX);
-}
-
-template <class T>
-Future<void> Future<T>::then() {
+Future<Unit> Future<T>::then() {
return then([] () {});
}
!detail::Extract<F>::ReturnsFuture::value,
Future<T>>::type
Future<T>::onError(F&& func) {
- typedef typename detail::Extract<F>::FirstArg Exn;
+ typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
static_assert(
std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
"Return type of onError callback must be T or Future<T>");
Promise<T> p;
+ p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
auto f = p.getFuture();
- auto pm = folly::makeMoveWrapper(std::move(p));
- auto funcm = folly::makeMoveWrapper(std::move(func));
- setCallback_([pm, funcm](Try<T>&& t) mutable {
- if (!t.template withException<Exn>([&] (Exn& e) {
- pm->setWith([&]{
- return (*funcm)(e);
- });
- })) {
- pm->setTry(std::move(t));
- }
- });
+
+ setCallback_(
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (auto e = t.template tryGetExceptionObject<Exn>()) {
+ state.setTry(makeTryWith([&] { return state.invoke(*e); }));
+ } else {
+ state.setTry(std::move(t));
+ }
+ });
return f;
}
static_assert(
std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
"Return type of onError callback must be T or Future<T>");
- typedef typename detail::Extract<F>::FirstArg Exn;
+ typedef std::remove_reference_t<typename detail::Extract<F>::FirstArg> Exn;
Promise<T> p;
auto f = p.getFuture();
- auto pm = folly::makeMoveWrapper(std::move(p));
- auto funcm = folly::makeMoveWrapper(std::move(func));
- setCallback_([pm, funcm](Try<T>&& t) mutable {
- if (!t.template withException<Exn>([&] (Exn& e) {
- try {
- auto f2 = (*funcm)(e);
- f2.setCallback_([pm](Try<T>&& t2) mutable {
- pm->setTry(std::move(t2));
+
+ setCallback_(
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (auto e = t.template tryGetExceptionObject<Exn>()) {
+ auto tf2 = state.tryInvoke(*e);
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
+ p.setTry(std::move(t3));
});
- } catch (const std::exception& e2) {
- pm->setException(exception_wrapper(std::current_exception(), e2));
- } catch (...) {
- pm->setException(exception_wrapper(std::current_exception()));
}
- })) {
- pm->setTry(std::move(t));
- }
- });
+ } else {
+ state.setTry(std::move(t));
+ }
+ });
return f;
}
template <class T>
template <class F>
-Future<T> Future<T>::ensure(F func) {
- MoveWrapper<F> funcw(std::move(func));
- return this->then([funcw](Try<T>&& t) {
- (*funcw)();
+Future<T> Future<T>::ensure(F&& func) {
+ return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
+ std::move(funcw)();
return makeFuture(std::move(t));
});
}
template <class T>
template <class F>
Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
- auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
- return within(dur, tk)
- .onError([funcw](TimedOut const&) { return (*funcw)(); });
+ return within(dur, tk).onError([funcw = std::forward<F>(func)](
+ TimedOut const&) { return std::move(funcw)(); });
}
template <class T>
template <class F>
-typename std::enable_if<
- detail::callableWith<F, exception_wrapper>::value &&
- detail::Extract<F>::ReturnsFuture::value,
- Future<T>>::type
+typename std::enable_if<detail::callableWith<F, exception_wrapper>::value &&
+ detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
Future<T>::onError(F&& func) {
static_assert(
std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
Promise<T> p;
auto f = p.getFuture();
- auto pm = folly::makeMoveWrapper(std::move(p));
- auto funcm = folly::makeMoveWrapper(std::move(func));
- setCallback_([pm, funcm](Try<T> t) mutable {
- if (t.hasException()) {
- try {
- auto f2 = (*funcm)(std::move(t.exception()));
- f2.setCallback_([pm](Try<T> t2) mutable {
- pm->setTry(std::move(t2));
- });
- } catch (const std::exception& e2) {
- pm->setException(exception_wrapper(std::current_exception(), e2));
- } catch (...) {
- pm->setException(exception_wrapper(std::current_exception()));
- }
- } else {
- pm->setTry(std::move(t));
- }
- });
+ setCallback_(
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> t) mutable {
+ if (t.hasException()) {
+ auto tf2 = state.tryInvoke(std::move(t.exception()));
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
+ p.setTry(std::move(t3));
+ });
+ }
+ } else {
+ state.setTry(std::move(t));
+ }
+ });
return f;
}
Promise<T> p;
auto f = p.getFuture();
- auto pm = folly::makeMoveWrapper(std::move(p));
- auto funcm = folly::makeMoveWrapper(std::move(func));
- setCallback_([pm, funcm](Try<T> t) mutable {
- if (t.hasException()) {
- pm->setWith([&]{
- return (*funcm)(std::move(t.exception()));
+ setCallback_(
+ [state = detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> && t) mutable {
+ if (t.hasException()) {
+ state.setTry(makeTryWith(
+ [&] { return state.invoke(std::move(t.exception())); }));
+ } else {
+ state.setTry(std::move(t));
+ }
});
- } else {
- pm->setTry(std::move(t));
- }
- });
return f;
}
return core_->getTry();
}
+template <class T>
+Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
+ return waitVia(e).getTry();
+}
+
template <class T>
Optional<Try<T>> Future<T>::poll() {
Optional<Try<T>> o;
inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
throwIfInvalid();
- MoveWrapper<Promise<T>> p;
- auto f = p->getFuture();
- then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
+ Promise<T> p;
+ auto f = p.getFuture();
+ then([p = std::move(p)](Try<T> && t) mutable { p.setTry(std::move(t)); });
return std::move(f).via(executor, priority);
}
-
template <class Func>
-auto via(Executor* x, Func func)
- -> Future<typename isFuture<decltype(func())>::Inner>
-// this would work, if not for Future<void> :-/
-// -> decltype(via(x).then(func))
-{
+auto via(Executor* x, Func&& func)
+ -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
// TODO make this actually more performant. :-P #7260175
- return via(x).then(func);
+ return via(x).then(std::forward<Func>(func));
}
template <class T>
core_->raise(std::move(exception));
}
+template <class T>
+Future<T>::Future(detail::EmptyConstruct) noexcept
+ : core_(nullptr) {}
+
// makeFuture
template <class T>
}
inline // for multiple translation units
-Future<void> makeFuture() {
- return makeFuture(Try<void>());
+Future<Unit> makeFuture() {
+ return makeFuture(Unit{});
}
+// makeFutureWith(Future<T>()) -> Future<T>
template <class F>
-auto makeFutureWith(
- F&& func,
- typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
- -> Future<decltype(func())> {
- return makeFuture(makeTryWith([&func]() {
- return (func)();
- }));
+typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
+ typename std::result_of<F()>::type>::type
+makeFutureWith(F&& func) {
+ using InnerType =
+ typename isFuture<typename std::result_of<F()>::type>::Inner;
+ try {
+ return std::forward<F>(func)();
+ } catch (std::exception& e) {
+ return makeFuture<InnerType>(
+ exception_wrapper(std::current_exception(), e));
+ } catch (...) {
+ return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
+ }
}
+// makeFutureWith(T()) -> Future<T>
+// makeFutureWith(void()) -> Future<Unit>
template <class F>
-auto makeFutureWith(F const& func) -> Future<decltype(func())> {
- F copy = func;
- return makeFuture(makeTryWith(std::move(copy)));
+typename std::enable_if<
+ !(isFuture<typename std::result_of<F()>::type>::value),
+ Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
+makeFutureWith(F&& func) {
+ using LiftedResult =
+ typename Unit::Lift<typename std::result_of<F()>::type>::type;
+ return makeFuture<LiftedResult>(
+ makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
}
template <class T>
}
// via
-Future<void> via(Executor* executor, int8_t priority) {
+Future<Unit> via(Executor* executor, int8_t priority) {
return makeFuture().via(executor, priority);
}
auto ctx = std::make_shared<detail::CollectAllVariadicContext<
typename std::decay<Fs>::type::value_type...>>();
detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
- ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+ ctx, std::forward<Fs>(fs)...);
return ctx->p.getFuture();
}
typename std::iterator_traits<InputIterator>::value_type::value_type T;
struct CollectAllContext {
- CollectAllContext(int n) : results(n) {}
+ CollectAllContext(size_t n) : results(n) {}
~CollectAllContext() {
p.setValue(std::move(results));
}
std::vector<Try<T>> results;
};
- auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
+ auto ctx =
+ std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
ctx->results[i] = std::move(t);
});
template <typename T>
struct CollectContext {
- struct Nothing { explicit Nothing(int n) {} };
+ struct Nothing {
+ explicit Nothing(int /* n */) {}
+ };
using Result = typename std::conditional<
std::is_void<T>::value,
Nothing,
std::vector<Optional<T>>>::type;
- explicit CollectContext(int n) : result(n) {}
+ explicit CollectContext(size_t n) : result(n) {}
~CollectContext() {
if (!threw.exchange(true)) {
// map Optional<T> -> T
std::atomic<bool> threw {false};
};
-// Specialize for void (implementations in Future.cpp)
-
-template <>
-CollectContext<void>::~CollectContext();
-
-template <>
-void CollectContext<void>::setPartialResult(size_t i, Try<void>& t);
-
}
template <class InputIterator>
auto ctx = std::make_shared<detail::CollectVariadicContext<
typename std::decay<Fs>::type::value_type...>>();
detail::collectVariadicHelper<detail::CollectVariadicContext>(
- ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+ ctx, std::forward<Fs>(fs)...);
return ctx->p.getFuture();
}
typename std::iterator_traits<InputIterator>::value_type::value_type T;
struct CollectAnyContext {
- CollectAnyContext() {};
+ CollectAnyContext() {}
Promise<std::pair<size_t, Try<T>>> p;
std::atomic<bool> done {false};
};
return ctx->p.getFuture();
}
+// collectAnyWithoutException (iterator)
+
+template <class InputIterator>
+Future<std::pair<
+ size_t,
+ typename std::iterator_traits<InputIterator>::value_type::value_type>>
+collectAnyWithoutException(InputIterator first, InputIterator last) {
+ typedef
+ typename std::iterator_traits<InputIterator>::value_type::value_type T;
+
+ struct CollectAnyWithoutExceptionContext {
+ CollectAnyWithoutExceptionContext(){}
+ Promise<std::pair<size_t, T>> p;
+ std::atomic<bool> done{false};
+ std::atomic<size_t> nFulfilled{0};
+ size_t nTotal;
+ };
+
+ auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
+ ctx->nTotal = size_t(std::distance(first, last));
+
+ mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+ if (!t.hasException() && !ctx->done.exchange(true)) {
+ ctx->p.setValue(std::make_pair(i, std::move(t.value())));
+ } else if (++ctx->nFulfilled == ctx->nTotal) {
+ ctx->p.setException(t.exception());
+ }
+ });
+ return ctx->p.getFuture();
+}
+
// collectN (iterator)
template <class InputIterator>
}
typedef typename std::iterator_traits<It>::value_type::value_type ItT;
- typedef typename std::conditional<
- detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
+ typedef
+ typename std::conditional<detail::callableWith<F, T&&, Try<ItT>&&>::value,
+ Try<ItT>,
+ ItT>::type Arg;
typedef isTry<Arg> IsTry;
- folly::MoveWrapper<T> minitial(std::move(initial));
auto sfunc = std::make_shared<F>(std::move(func));
- auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
- return (*sfunc)(std::move(*minitial),
- head.template get<IsTry::value, Arg&&>());
- });
+ auto f = first->then(
+ [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
+ return (*sfunc)(
+ std::move(minitial), head.template get<IsTry::value, Arg&&>());
+ });
for (++first; first != last; ++first) {
f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
template <class T>
template <class I, class F>
Future<I> Future<T>::reduce(I&& initial, F&& func) {
- folly::MoveWrapper<I> minitial(std::move(initial));
- folly::MoveWrapper<F> mfunc(std::move(func));
- return then([minitial, mfunc](T& vals) mutable {
- auto ret = std::move(*minitial);
+ return then([
+ minitial = std::forward<I>(initial),
+ mfunc = std::forward<F>(func)
+ ](T& vals) mutable {
+ auto ret = std::move(minitial);
for (auto& val : vals) {
- ret = (*mfunc)(std::move(ret), std::move(val));
+ ret = mfunc(std::move(ret), std::move(val));
}
return ret;
});
UnorderedReduceContext(T&& memo, F&& fn, size_t n)
: lock_(), memo_(makeFuture<T>(std::move(memo))),
func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
- {};
+ {}
folly::MicroSpinLock lock_; // protects memo_ and numThens_
Future<T> memo_;
F func_;
auto ctx = std::make_shared<UnorderedReduceContext>(
std::move(initial), std::move(func), std::distance(first, last));
- mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
- folly::MoveWrapper<Try<ItT>> mt(std::move(t));
- // Futures can be completed in any order, simultaneously.
- // To make this non-blocking, we create a new Future chain in
- // the order of completion to reduce the values.
- // The spinlock just protects chaining a new Future, not actually
- // executing the reduce, which should be really fast.
- folly::MSLGuard lock(ctx->lock_);
- ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
- // Either return a ItT&& or a Try<ItT>&& depending
- // on the type of the argument of func.
- return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
- });
- if (++ctx->numThens_ == ctx->numFutures_) {
- // After reducing the value of the last Future, fulfill the Promise
- ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
- ctx->promise_.setValue(std::move(t2));
+ mapSetCallback<ItT>(
+ first,
+ last,
+ [ctx](size_t /* i */, Try<ItT>&& t) {
+ // Futures can be completed in any order, simultaneously.
+ // To make this non-blocking, we create a new Future chain in
+ // the order of completion to reduce the values.
+ // The spinlock just protects chaining a new Future, not actually
+ // executing the reduce, which should be really fast.
+ folly::MSLGuard lock(ctx->lock_);
+ ctx->memo_ =
+ ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
+ // Either return a ItT&& or a Try<ItT>&& depending
+ // on the type of the argument of func.
+ return ctx->func_(std::move(v),
+ mt.template get<IsTry::value, Arg&&>());
+ });
+ if (++ctx->numThens_ == ctx->numFutures_) {
+ // After reducing the value of the last Future, fulfill the Promise
+ ctx->memo_.setCallback_(
+ [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
+ }
});
- }
- });
return ctx->promise_.getFuture();
}
struct Context {
Context(E ex) : exception(std::move(ex)), promise() {}
E exception;
+ Future<Unit> thisFuture;
Promise<T> promise;
std::atomic<bool> token {false};
};
- auto ctx = std::make_shared<Context>(std::move(e));
+ std::shared_ptr<Timekeeper> tks;
if (!tk) {
- tk = folly::detail::getTimekeeperSingleton();
+ tks = folly::detail::getTimekeeperSingleton();
+ tk = DCHECK_NOTNULL(tks.get());
}
- tk->after(dur)
- .then([ctx](Try<void> const& t) {
- if (ctx->token.exchange(true) == false) {
- if (t.hasException()) {
- ctx->promise.setException(std::move(t.exception()));
- } else {
- ctx->promise.setException(std::move(ctx->exception));
- }
- }
- });
+ auto ctx = std::make_shared<Context>(std::move(e));
- this->then([ctx](Try<T>&& t) {
+ ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
+ // TODO: "this" completed first, cancel "after"
if (ctx->token.exchange(true) == false) {
ctx->promise.setTry(std::move(t));
}
});
+ tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
+ // "after" completed first, cancel "this"
+ ctx->thisFuture.raise(TimedOut());
+ if (ctx->token.exchange(true) == false) {
+ if (t.hasException()) {
+ ctx->promise.setException(std::move(t.exception()));
+ } else {
+ ctx->promise.setException(std::move(ctx->exception));
+ }
+ }
+ });
+
return ctx->promise.getFuture().via(getExecutor());
}
template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
return collectAll(*this, futures::sleep(dur, tk))
- .then([](std::tuple<Try<T>, Try<void>> tup) {
+ .then([](std::tuple<Try<T>, Try<Unit>> tup) {
Try<T>& t = std::get<0>(tup);
return makeFuture<T>(std::move(t));
});
// short-circuit if there's nothing to do
if (f.isReady()) return;
- folly::fibers::Baton baton;
- f = f.then([&](Try<T> t) {
- baton.post();
- return makeFuture(std::move(t));
- });
+ FutureBatonType baton;
+ f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
baton.wait();
-
- // There's a race here between the return here and the actual finishing of
- // the future. f is completed, but the setup may not have finished on done
- // after the baton has posted.
- while (!f.isReady()) {
- std::this_thread::yield();
- }
+ assert(f.isReady());
}
template <class T>
void waitImpl(Future<T>& f, Duration dur) {
// short-circuit if there's nothing to do
- if (f.isReady()) return;
+ if (f.isReady()) {
+ return;
+ }
- auto baton = std::make_shared<folly::fibers::Baton>();
- f = f.then([baton](Try<T> t) {
+ Promise<T> promise;
+ auto ret = promise.getFuture();
+ auto baton = std::make_shared<FutureBatonType>();
+ f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
+ promise.setTry(std::move(t));
baton->post();
- return makeFuture(std::move(t));
});
-
- // Let's preserve the invariant that if we did not timeout (timed_wait returns
- // true), then the returned Future is complete when it is returned to the
- // caller. We need to wait out the race for that Future to complete.
+ f = std::move(ret);
if (baton->timed_wait(dur)) {
- while (!f.isReady()) {
- std::this_thread::yield();
- }
+ assert(f.isReady());
}
}
template <class T>
void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
+ // Set callback so to ensure that the via executor has something on it
+ // so that once the preceding future triggers this callback, drive will
+ // always have a callback to satisfy it
+ if (f.isReady())
+ return;
+ f = f.via(e).then([](T&& t) { return std::move(t); });
while (!f.isReady()) {
e->drive();
}
+ assert(f.isReady());
}
} // detail
return std::move(wait().value());
}
-template <>
-inline void Future<void>::get() {
- wait().value();
-}
-
template <class T>
T Future<T>::get(Duration dur) {
wait(dur);
}
}
-template <>
-inline void Future<void>::get(Duration dur) {
- wait(dur);
- if (isReady()) {
- return;
- } else {
- throw TimedOut();
- }
-}
-
template <class T>
T Future<T>::getVia(DrivableExecutor* e) {
return std::move(waitVia(e).value());
}
-template <>
-inline void Future<void>::getVia(DrivableExecutor* e) {
- waitVia(e).value();
-}
-
namespace detail {
template <class T>
struct TryEquals {
return t1.value() == t2.value();
}
};
-
- template <>
- struct TryEquals<void> {
- static bool equals(const Try<void>& t1, const Try<void>& t2) {
- return true;
- }
- };
}
template <class T>
template <class T>
template <class F>
-Future<T> Future<T>::filter(F predicate) {
- auto p = folly::makeMoveWrapper(std::move(predicate));
- return this->then([p](T val) {
+Future<T> Future<T>::filter(F&& predicate) {
+ return this->then([p = std::forward<F>(predicate)](T val) {
T const& valConstRef = val;
- if (!(*p)(valConstRef)) {
+ if (!p(valConstRef)) {
throw PredicateDoesNotObtain();
}
return val;
});
}
-template <class T>
-template <class Callback>
-auto Future<T>::thenMulti(Callback&& fn)
- -> decltype(this->then(std::forward<Callback>(fn))) {
- // thenMulti with one callback is just a then
- return then(std::forward<Callback>(fn));
-}
-
-template <class T>
-template <class Callback, class... Callbacks>
-auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
- -> decltype(this->then(std::forward<Callback>(fn)).
- thenMulti(std::forward<Callbacks>(fns)...)) {
- // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
- return then(std::forward<Callback>(fn)).
- thenMulti(std::forward<Callbacks>(fns)...);
-}
-
-template <class T>
-template <class Callback, class... Callbacks>
-auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
- Callbacks&&... fns)
- -> decltype(this->then(std::forward<Callback>(fn)).
- thenMulti(std::forward<Callbacks>(fns)...)) {
- // thenMultiExecutor with two callbacks is
- // via(x).then(a).thenMulti(b, ...).via(oldX)
- auto oldX = getExecutor();
- setExecutor(x);
- return then(std::forward<Callback>(fn)).
- thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
+template <class F>
+inline Future<Unit> when(bool p, F&& thunk) {
+ return p ? std::forward<F>(thunk)().unit() : makeFuture();
+}
+
+template <class P, class F>
+Future<Unit> whileDo(P&& predicate, F&& thunk) {
+ if (predicate()) {
+ auto future = thunk();
+ return future.then([
+ predicate = std::forward<P>(predicate),
+ thunk = std::forward<F>(thunk)
+ ]() mutable {
+ return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
+ });
+ }
+ return makeFuture();
}
-template <class T>
-template <class Callback>
-auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
- -> decltype(this->then(std::forward<Callback>(fn))) {
- // thenMulti with one callback is just a then with an executor
- return then(x, std::forward<Callback>(fn));
+template <class F>
+Future<Unit> times(const int n, F&& thunk) {
+ return folly::whileDo(
+ [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
+ return count->fetch_add(1) < n;
+ },
+ std::forward<F>(thunk));
}
namespace futures {
}
}
+namespace futures {
+
+namespace detail {
+
+struct retrying_policy_raw_tag {};
+struct retrying_policy_fut_tag {};
+
+template <class Policy>
+struct retrying_policy_traits {
+ using result = std::result_of_t<Policy(size_t, const exception_wrapper&)>;
+ using is_raw = std::is_same<result, bool>;
+ using is_fut = std::is_same<result, Future<bool>>;
+ using tag = typename std::conditional<
+ is_raw::value, retrying_policy_raw_tag, typename std::conditional<
+ is_fut::value, retrying_policy_fut_tag, void>::type>::type;
+};
+
+template <class Policy, class FF, class Prom>
+void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
+ using F = typename std::result_of<FF(size_t)>::type;
+ using T = typename F::value_type;
+ auto f = makeFutureWith([&] { return ff(k++); });
+ f.then([
+ k,
+ prom = std::move(prom),
+ pm = std::forward<Policy>(p),
+ ffm = std::forward<FF>(ff)
+ ](Try<T> && t) mutable {
+ if (t.hasValue()) {
+ prom.setValue(std::move(t).value());
+ return;
+ }
+ auto& x = t.exception();
+ auto q = pm(k, x);
+ q.then([
+ k,
+ prom = std::move(prom),
+ xm = std::move(x),
+ pm = std::move(pm),
+ ffm = std::move(ffm)
+ ](bool shouldRetry) mutable {
+ if (shouldRetry) {
+ retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
+ } else {
+ prom.setException(std::move(xm));
+ };
+ });
+ });
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(size_t k, Policy&& p, FF&& ff) {
+ using F = typename std::result_of<FF(size_t)>::type;
+ using T = typename F::value_type;
+ auto prom = Promise<T>();
+ auto f = prom.getFuture();
+ retryingImpl(
+ k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
+ return f;
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
+ auto q = [pm = std::forward<Policy>(p)](size_t k, exception_wrapper x) {
+ return makeFuture<bool>(pm(k, x));
+ };
+ return retrying(0, std::move(q), std::forward<FF>(ff));
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
+ return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
+}
+
+// jittered exponential backoff, clamped to [backoff_min, backoff_max]
+template <class URNG>
+Duration retryingJitteredExponentialBackoffDur(
+ size_t n,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG& rng) {
+ using d = Duration;
+ auto dist = std::normal_distribution<double>(0.0, jitter_param);
+ auto jitter = std::exp(dist(rng));
+ auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
+ return std::max(backoff_min, std::min(backoff_max, backoff));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG&& rng,
+ Policy&& p) {
+ return [
+ pm = std::forward<Policy>(p),
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ rngp = std::forward<URNG>(rng)
+ ](size_t n, const exception_wrapper& ex) mutable {
+ if (n == max_tries) {
+ return makeFuture(false);
+ }
+ return pm(n, ex).then(
+ [ n, backoff_min, backoff_max, jitter_param, rngp = std::move(rngp) ](
+ bool v) mutable {
+ if (!v) {
+ return makeFuture(false);
+ }
+ auto backoff = detail::retryingJitteredExponentialBackoffDur(
+ n, backoff_min, backoff_max, jitter_param, rngp);
+ return futures::sleep(backoff).then([] { return true; });
+ });
+ };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG&& rng,
+ Policy&& p,
+ retrying_policy_raw_tag) {
+ auto q = [pm = std::forward<Policy>(p)](
+ size_t n, const exception_wrapper& e) {
+ return makeFuture(pm(n, e));
+ };
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::forward<URNG>(rng),
+ std::move(q));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG&& rng,
+ Policy&& p,
+ retrying_policy_fut_tag) {
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::forward<URNG>(rng),
+ std::forward<Policy>(p));
+}
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff) {
+ using tag = typename detail::retrying_policy_traits<Policy>::tag;
+ return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
+}
+
+inline
+std::function<bool(size_t, const exception_wrapper&)>
+retryingPolicyBasic(
+ size_t max_tries) {
+ return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG&& rng,
+ Policy&& p) {
+ using tag = typename detail::retrying_policy_traits<Policy>::tag;
+ return detail::retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::forward<URNG>(rng),
+ std::forward<Policy>(p),
+ tag());
+}
+
+inline
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param) {
+ auto p = [](size_t, const exception_wrapper&) { return true; };
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ ThreadLocalPRNG(),
+ std::move(p));
+}
+
+}
+
// Instantiate the most common Future types to save compile time
-extern template class Future<void>;
+extern template class Future<Unit>;
extern template class Future<bool>;
extern template class Future<int>;
extern template class Future<int64_t>;
extern template class Future<double>;
} // namespace folly
-
-// I haven't included a Future<T&> specialization because I don't forsee us
-// using it, however it is not difficult to add when needed. Refer to
-// Future<void> for guidance. std::future and boost::future code would also be
-// instructive.