/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#pragma once
+#include <algorithm>
+#include <cassert>
#include <chrono>
#include <thread>
-#include <folly/experimental/fibers/Baton.h>
+#include <folly/Baton.h>
#include <folly/Optional.h>
-#include <folly/futures/detail/Core.h>
+#include <folly/futures/InlineExecutor.h>
#include <folly/futures/Timekeeper.h>
+#include <folly/futures/detail/Core.h>
+
+#ifndef FOLLY_FUTURE_USING_FIBER
+#if FOLLY_MOBILE || defined(__APPLE__)
+#define FOLLY_FUTURE_USING_FIBER 0
+#else
+#define FOLLY_FUTURE_USING_FIBER 1
+#include <folly/fibers/Baton.h>
+#endif
+#endif
namespace folly {
class Timekeeper;
+namespace futures {
+namespace detail {
+#if FOLLY_FUTURE_USING_FIBER
+typedef folly::fibers::Baton FutureBatonType;
+#else
+typedef folly::Baton<> FutureBatonType;
+#endif
+} // namespace detail
+} // namespace futures
+
+namespace detail {
+std::shared_ptr<Timekeeper> getTimekeeperSingleton();
+} // namespace detail
+
+namespace futures {
namespace detail {
- Timekeeper* getTimekeeperSingleton();
+// Guarantees that the stored functor is destructed before the stored promise
+// may be fulfilled. Assumes the stored functor to be noexcept-destructible.
+template <typename T, typename F>
+class CoreCallbackState {
+ public:
+ template <typename FF>
+ CoreCallbackState(Promise<T>&& promise, FF&& func) noexcept(
+ noexcept(F(std::declval<FF>())))
+ : func_(std::forward<FF>(func)), promise_(std::move(promise)) {
+ assert(before_barrier());
+ }
+
+ CoreCallbackState(CoreCallbackState&& that) noexcept(
+ noexcept(F(std::declval<F>()))) {
+ if (that.before_barrier()) {
+ new (&func_) F(std::move(that.func_));
+ promise_ = that.stealPromise();
+ }
+ }
+
+ CoreCallbackState& operator=(CoreCallbackState&&) = delete;
+
+ ~CoreCallbackState() {
+ if (before_barrier()) {
+ stealPromise();
+ }
+ }
+
+ template <typename... Args>
+ auto invoke(Args&&... args) noexcept(
+ noexcept(std::declval<F&&>()(std::declval<Args&&>()...))) {
+ assert(before_barrier());
+ return std::move(func_)(std::forward<Args>(args)...);
+ }
+
+ template <typename... Args>
+ auto tryInvoke(Args&&... args) noexcept {
+ return makeTryWith([&] { return invoke(std::forward<Args>(args)...); });
+ }
+
+ void setTry(Try<T>&& t) {
+ stealPromise().setTry(std::move(t));
+ }
+
+ void setException(exception_wrapper&& ew) {
+ stealPromise().setException(std::move(ew));
+ }
+
+ Promise<T> stealPromise() noexcept {
+ assert(before_barrier());
+ func_.~F();
+ return std::move(promise_);
+ }
+
+ private:
+ bool before_barrier() const noexcept {
+ return !promise_.isFulfilled();
+ }
+
+ union {
+ F func_;
+ };
+ Promise<T> promise_{Promise<T>::makeEmpty()};
+};
+
+template <typename T, typename F>
+inline auto makeCoreCallbackState(Promise<T>&& p, F&& f) noexcept(
+ noexcept(CoreCallbackState<T, _t<std::decay<F>>>(
+ std::declval<Promise<T>&&>(),
+ std::declval<F&&>()))) {
+ return CoreCallbackState<T, _t<std::decay<F>>>(
+ std::move(p), std::forward<F>(f));
+}
+} // namespace detail
+} // namespace futures
+
+template <class T>
+SemiFuture<typename std::decay<T>::type> makeSemiFuture(T&& t) {
+ return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
+}
+
+// makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
+template <class F>
+typename std::enable_if<
+ isSemiFuture<typename std::result_of<F()>::type>::value,
+ typename std::result_of<F()>::type>::type
+makeSemiFutureWith(F&& func) {
+ using InnerType =
+ typename isSemiFuture<typename std::result_of<F()>::type>::Inner;
+ try {
+ return std::forward<F>(func)();
+ } catch (std::exception& e) {
+ return makeSemiFuture<InnerType>(
+ exception_wrapper(std::current_exception(), e));
+ } catch (...) {
+ return makeSemiFuture<InnerType>(
+ exception_wrapper(std::current_exception()));
+ }
+}
+
+// makeSemiFutureWith(T()) -> SemiFuture<T>
+// makeSemiFutureWith(void()) -> SemiFuture<Unit>
+template <class F>
+typename std::enable_if<
+ !(isSemiFuture<typename std::result_of<F()>::type>::value),
+ SemiFuture<Unit::LiftT<typename std::result_of<F()>::type>>>::type
+makeSemiFutureWith(F&& func) {
+ using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
+ return makeSemiFuture<LiftedResult>(
+ makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
}
template <class T>
-Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
+SemiFuture<T> makeSemiFuture(std::exception_ptr const& e) {
+ return makeSemiFuture(Try<T>(e));
+}
+
+template <class T>
+SemiFuture<T> makeSemiFuture(exception_wrapper ew) {
+ return makeSemiFuture(Try<T>(std::move(ew)));
+}
+
+template <class T, class E>
+typename std::
+ enable_if<std::is_base_of<std::exception, E>::value, SemiFuture<T>>::type
+ makeSemiFuture(E const& e) {
+ return makeSemiFuture(Try<T>(make_exception_wrapper<E>(e)));
+}
+
+template <class T>
+SemiFuture<T> makeSemiFuture(Try<T>&& t) {
+ return SemiFuture<T>(new futures::detail::Core<T>(std::move(t)));
+}
+
+template <class T>
+SemiFuture<T> SemiFuture<T>::makeEmpty() {
+ return SemiFuture<T>(futures::detail::EmptyConstruct{});
+}
+
+template <class T>
+SemiFuture<T>::SemiFuture(SemiFuture<T>&& other) noexcept : core_(other.core_) {
other.core_ = nullptr;
}
template <class T>
-Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
+SemiFuture<T>& SemiFuture<T>::operator=(SemiFuture<T>&& other) noexcept {
std::swap(core_, other.core_);
return *this;
}
template <class T>
-template <class T2,
- typename std::enable_if<!isFuture<T2>::value, void*>::type>
-Future<T>::Future(T2&& val) : core_(nullptr) {
- Promise<T> p;
- p.setValue(std::forward<T2>(val));
- *this = p.getFuture();
+SemiFuture<T>::SemiFuture(Future<T>&& other) noexcept : core_(other.core_) {
+ other.core_ = nullptr;
}
template <class T>
-template <class T2,
- typename std::enable_if<
- folly::is_void_or_unit<T2>::value,
- int>::type>
-Future<T>::Future() : core_(nullptr) {
- Promise<T> p;
- p.setValue();
- *this = p.getFuture();
+SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
+ std::swap(core_, other.core_);
+ return *this;
}
+template <class T>
+template <class T2, typename>
+SemiFuture<T>::SemiFuture(T2&& val)
+ : core_(new futures::detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
template <class T>
-Future<T>::~Future() {
+template <typename T2>
+SemiFuture<T>::SemiFuture(
+ typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
+ : core_(new futures::detail::Core<T>(Try<T>(T()))) {}
+
+template <class T>
+template <
+ class... Args,
+ typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
+ type>
+SemiFuture<T>::SemiFuture(in_place_t, Args&&... args)
+ : core_(
+ new futures::detail::Core<T>(in_place, std::forward<Args>(args)...)) {
+}
+
+template <class T>
+SemiFuture<T>::~SemiFuture() {
detach();
}
+// This must be defined after the constructors to avoid a bug in MSVC
+// https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
+inline SemiFuture<Unit> makeSemiFuture() {
+ return makeSemiFuture(Unit{});
+}
+
+template <class T>
+T& SemiFuture<T>::value() & {
+ throwIfInvalid();
+
+ return core_->getTry().value();
+}
+
+template <class T>
+T const& SemiFuture<T>::value() const& {
+ throwIfInvalid();
+
+ return core_->getTry().value();
+}
+
+template <class T>
+T&& SemiFuture<T>::value() && {
+ throwIfInvalid();
+
+ return std::move(core_->getTry().value());
+}
+
template <class T>
-void Future<T>::detach() {
+T const&& SemiFuture<T>::value() const&& {
+ throwIfInvalid();
+
+ return std::move(core_->getTry().value());
+}
+
+template <class T>
+inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
+ throwIfInvalid();
+
+ setExecutor(executor, priority);
+
+ auto newFuture = Future<T>(core_);
+ core_ = nullptr;
+ return newFuture;
+}
+
+template <class T>
+inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) & {
+ throwIfInvalid();
+ Promise<T> p;
+ auto f = p.getFuture();
+ auto func = [p = std::move(p)](Try<T>&& t) mutable {
+ p.setTry(std::move(t));
+ };
+ using R = futures::detail::callableResult<T, decltype(func)>;
+ thenImplementation<decltype(func), R>(std::move(func), typename R::Arg());
+ return std::move(f).via(executor, priority);
+}
+
+template <class T>
+bool SemiFuture<T>::isReady() const {
+ throwIfInvalid();
+ return core_->ready();
+}
+
+template <class T>
+bool SemiFuture<T>::hasValue() {
+ return getTry().hasValue();
+}
+
+template <class T>
+bool SemiFuture<T>::hasException() {
+ return getTry().hasException();
+}
+
+template <class T>
+void SemiFuture<T>::detach() {
if (core_) {
core_->detachFuture();
core_ = nullptr;
}
template <class T>
-void Future<T>::throwIfInvalid() const {
- if (!core_)
- throw NoState();
+Try<T>& SemiFuture<T>::getTry() {
+ throwIfInvalid();
+
+ return core_->getTry();
+}
+
+template <class T>
+void SemiFuture<T>::throwIfInvalid() const {
+ if (!core_) {
+ throwNoState();
+}
+}
+
+template <class T>
+Optional<Try<T>> SemiFuture<T>::poll() {
+ Optional<Try<T>> o;
+ if (core_->ready()) {
+ o = std::move(core_->getTry());
+ }
+ return o;
+}
+
+template <class T>
+void SemiFuture<T>::raise(exception_wrapper exception) {
+ core_->raise(std::move(exception));
}
template <class T>
template <class F>
-void Future<T>::setCallback_(F&& func) {
+void SemiFuture<T>::setCallback_(F&& func) {
throwIfInvalid();
- core_->setCallback(std::move(func));
+ core_->setCallback(std::forward<F>(func));
+}
+
+template <class T>
+SemiFuture<T>::SemiFuture(futures::detail::EmptyConstruct) noexcept
+ : core_(nullptr) {}
+
+template <class T>
+Future<T> Future<T>::makeEmpty() {
+ return Future<T>(futures::detail::EmptyConstruct{});
+}
+
+template <class T>
+Future<T>::Future(Future<T>&& other) noexcept
+ : SemiFuture<T>(std::move(other)) {}
+
+template <class T>
+Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
+ SemiFuture<T>::operator=(SemiFuture<T>{std::move(other)});
+ return *this;
+}
+
+template <class T>
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value &&
+ std::is_convertible<T2&&, T>::value,
+ int>::type>
+Future<T>::Future(Future<T2>&& other)
+ : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
+
+template <class T>
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value &&
+ !std::is_convertible<T2&&, T>::value,
+ int>::type>
+Future<T>::Future(Future<T2>&& other)
+ : Future(std::move(other).then([](T2&& v) { return T(std::move(v)); })) {}
+
+template <class T>
+template <
+ class T2,
+ typename std::enable_if<
+ !std::is_same<T, typename std::decay<T2>::type>::value &&
+ std::is_constructible<T, T2&&>::value,
+ int>::type>
+Future<T>& Future<T>::operator=(Future<T2>&& other) {
+ return operator=(
+ std::move(other).then([](T2&& v) { return T(std::move(v)); }));
+}
+
+// TODO: isSemiFuture
+template <class T>
+template <class T2, typename>
+Future<T>::Future(T2&& val) : SemiFuture<T>(std::forward<T2>(val)) {}
+
+template <class T>
+template <typename T2>
+Future<T>::Future(typename std::enable_if<std::is_same<Unit, T2>::value>::type*)
+ : SemiFuture<T>() {}
+
+template <class T>
+template <
+ class... Args,
+ typename std::enable_if<std::is_constructible<T, Args&&...>::value, int>::
+ type>
+Future<T>::Future(in_place_t, Args&&... args)
+ : SemiFuture<T>(in_place, std::forward<Args>(args)...) {}
+
+template <class T>
+Future<T>::~Future() {
}
// unwrap
template <class T>
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
+SemiFuture<T>::thenImplementation(
+ F&& func,
+ futures::detail::argResult<isTry, F, Args...>) {
static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
typedef typename R::ReturnsFuture::Inner B;
- throwIfInvalid();
+ this->throwIfInvalid();
- // wrap these so we can move them into the lambda
- folly::MoveWrapper<Promise<B>> p;
- folly::MoveWrapper<F> funcm(std::forward<F>(func));
+ Promise<B> p;
+ p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
// grab the Future now before we lose our handle on the Promise
- auto f = p->getFuture();
- if (getExecutor()) {
- f.setExecutor(getExecutor());
- }
+ auto f = p.getFuture();
+ f.core_->setExecutorNoLock(this->getExecutor());
/* This is a bit tricky.
persist beyond the callback, if it gets moved), and so it is an
optimization to just make it shared from the get-go.
- We have to move in the Promise and func using the MoveWrapper
- hack. (func could be copied but it's a big drag on perf).
-
- Two subtle but important points about this design. detail::Core has no
- back pointers to Future or Promise, so if Future or Promise get moved
- (and they will be moved in performant code) we don't have to do
+ Two subtle but important points about this design. futures::detail::Core
+ has no back pointers to Future or Promise, so if Future or Promise get
+ moved (and they will be moved in performant code) we don't have to do
anything fancy. And because we store the continuation in the
- detail::Core, not in the Future, we can execute the continuation even
- after the Future has gone out of scope. This is an intentional design
+ futures::detail::Core, not in the Future, we can execute the continuation
+ even after the Future has gone out of scope. This is an intentional design
decision. It is likely we will want to be able to cancel a continuation
in some circumstances, but I think it should be explicit not implicit
in the destruction of the Future used to create it.
*/
- setCallback_(
- [p, funcm](Try<T>&& t) mutable {
- if (!isTry && t.hasException()) {
- p->setException(std::move(t.exception()));
- } else {
- p->setWith([&]() {
- return (*funcm)(t.template get<isTry, Args>()...);
- });
- }
- });
+ this->setCallback_(
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
+ if (!isTry && t.hasException()) {
+ state.setException(std::move(t.exception()));
+ } else {
+ state.setTry(makeTryWith(
+ [&] { return state.invoke(t.template get<isTry, Args>()...); }));
+ }
+ });
return f;
}
template <class T>
template <typename F, typename R, bool isTry, typename... Args>
typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
-Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
+SemiFuture<T>::thenImplementation(
+ F&& func,
+ futures::detail::argResult<isTry, F, Args...>) {
static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
typedef typename R::ReturnsFuture::Inner B;
+ this->throwIfInvalid();
- throwIfInvalid();
-
- // wrap these so we can move them into the lambda
- folly::MoveWrapper<Promise<B>> p;
- folly::MoveWrapper<F> funcm(std::forward<F>(func));
+ Promise<B> p;
+ p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
// grab the Future now before we lose our handle on the Promise
- auto f = p->getFuture();
- if (getExecutor()) {
- f.setExecutor(getExecutor());
- }
+ auto f = p.getFuture();
+ f.core_->setExecutorNoLock(this->getExecutor());
- setCallback_(
- [p, funcm](Try<T>&& t) mutable {
- if (!isTry && t.hasException()) {
- p->setException(std::move(t.exception()));
- } else {
- try {
- auto f2 = (*funcm)(t.template get<isTry, Args>()...);
- // that didn't throw, now we can steal p
- f2.setCallback_([p](Try<B>&& b) mutable {
- p->setTry(std::move(b));
- });
- } catch (const std::exception& e) {
- p->setException(exception_wrapper(std::current_exception(), e));
- } catch (...) {
- p->setException(exception_wrapper(std::current_exception()));
+ this->setCallback_(
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
+ if (!isTry && t.hasException()) {
+ state.setException(std::move(t.exception()));
+ } else {
+ auto tf2 = state.tryInvoke(t.template get<isTry, Args>()...);
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<B> && b) mutable {
+ p.setTry(std::move(b));
+ });
+ }
}
- }
- });
+ });
return f;
}
template <typename R, typename Caller, typename... Args>
Future<typename isFuture<R>::Inner>
Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
- typedef typename std::remove_cv<
- typename std::remove_reference<
- typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
+ typedef typename std::remove_cv<typename std::remove_reference<
+ typename futures::detail::ArgType<Args...>::FirstArg>::type>::type
+ FirstArg;
+
return then([instance, func](Try<T>&& t){
return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
});
}
-// TODO(6838553)
-#ifndef __clang__
-template <class T>
-template <class... Args>
-auto Future<T>::then(Executor* x, Args&&... args)
- -> decltype(this->then(std::forward<Args>(args)...))
-{
- auto oldX = getExecutor();
- setExecutor(x);
- return this->then(std::forward<Args>(args)...).via(oldX);
-}
-#endif
-
template <class T>
-Future<void> Future<T>::then() {
- return then([] (Try<T>&& t) {});
+Future<Unit> Future<T>::then() {
+ return then([] () {});
}
// onError where the callback returns T
template <class T>
template <class F>
typename std::enable_if<
- !detail::callableWith<F, exception_wrapper>::value &&
- !detail::Extract<F>::ReturnsFuture::value,
- Future<T>>::type
+ !futures::detail::callableWith<F, exception_wrapper>::value &&
+ !futures::detail::callableWith<F, exception_wrapper&>::value &&
+ !futures::detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
Future<T>::onError(F&& func) {
- typedef typename detail::Extract<F>::FirstArg Exn;
+ typedef std::remove_reference_t<
+ typename futures::detail::Extract<F>::FirstArg>
+ Exn;
static_assert(
- std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
+ std::is_same<typename futures::detail::Extract<F>::RawReturn, T>::value,
"Return type of onError callback must be T or Future<T>");
Promise<T> p;
+ p.core_->setInterruptHandlerNoLock(this->core_->getInterruptHandler());
auto f = p.getFuture();
- auto pm = folly::makeMoveWrapper(std::move(p));
- auto funcm = folly::makeMoveWrapper(std::move(func));
- setCallback_([pm, funcm](Try<T>&& t) mutable {
- if (!t.template withException<Exn>([&] (Exn& e) {
- pm->setWith([&]{
- return (*funcm)(e);
- });
- })) {
- pm->setTry(std::move(t));
- }
- });
+
+ this->setCallback_(
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
+ if (auto e = t.template tryGetExceptionObject<Exn>()) {
+ state.setTry(makeTryWith([&] { return state.invoke(*e); }));
+ } else {
+ state.setTry(std::move(t));
+ }
+ });
return f;
}
template <class T>
template <class F>
typename std::enable_if<
- !detail::callableWith<F, exception_wrapper>::value &&
- detail::Extract<F>::ReturnsFuture::value,
- Future<T>>::type
+ !futures::detail::callableWith<F, exception_wrapper>::value &&
+ !futures::detail::callableWith<F, exception_wrapper&>::value &&
+ futures::detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
Future<T>::onError(F&& func) {
static_assert(
- std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
+ std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
+ value,
"Return type of onError callback must be T or Future<T>");
- typedef typename detail::Extract<F>::FirstArg Exn;
+ typedef std::remove_reference_t<
+ typename futures::detail::Extract<F>::FirstArg>
+ Exn;
Promise<T> p;
auto f = p.getFuture();
- auto pm = folly::makeMoveWrapper(std::move(p));
- auto funcm = folly::makeMoveWrapper(std::move(func));
- setCallback_([pm, funcm](Try<T>&& t) mutable {
- if (!t.template withException<Exn>([&] (Exn& e) {
- try {
- auto f2 = (*funcm)(e);
- f2.setCallback_([pm](Try<T>&& t2) mutable {
- pm->setTry(std::move(t2));
+
+ this->setCallback_(
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
+ if (auto e = t.template tryGetExceptionObject<Exn>()) {
+ auto tf2 = state.tryInvoke(*e);
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
+ p.setTry(std::move(t3));
});
- } catch (const std::exception& e2) {
- pm->setException(exception_wrapper(std::current_exception(), e2));
- } catch (...) {
- pm->setException(exception_wrapper(std::current_exception()));
}
- })) {
- pm->setTry(std::move(t));
- }
- });
+ } else {
+ state.setTry(std::move(t));
+ }
+ });
return f;
}
template <class T>
template <class F>
-Future<T> Future<T>::ensure(F func) {
- MoveWrapper<F> funcw(std::move(func));
- return this->then([funcw](Try<T>&& t) {
- (*funcw)();
+Future<T> Future<T>::ensure(F&& func) {
+ return this->then([funcw = std::forward<F>(func)](Try<T> && t) mutable {
+ std::move(funcw)();
return makeFuture(std::move(t));
});
}
template <class T>
template <class F>
Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
- auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
- return within(dur, tk)
- .onError([funcw](TimedOut const&) { return (*funcw)(); });
+ return within(dur, tk).onError([funcw = std::forward<F>(func)](
+ TimedOut const&) { return std::move(funcw)(); });
}
template <class T>
template <class F>
typename std::enable_if<
- detail::callableWith<F, exception_wrapper>::value &&
- detail::Extract<F>::ReturnsFuture::value,
- Future<T>>::type
+ futures::detail::callableWith<F, exception_wrapper>::value &&
+ futures::detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
Future<T>::onError(F&& func) {
static_assert(
- std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
+ std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
+ value,
"Return type of onError callback must be T or Future<T>");
Promise<T> p;
auto f = p.getFuture();
- auto pm = folly::makeMoveWrapper(std::move(p));
- auto funcm = folly::makeMoveWrapper(std::move(func));
- setCallback_([pm, funcm](Try<T> t) mutable {
- if (t.hasException()) {
- try {
- auto f2 = (*funcm)(std::move(t.exception()));
- f2.setCallback_([pm](Try<T> t2) mutable {
- pm->setTry(std::move(t2));
- });
- } catch (const std::exception& e2) {
- pm->setException(exception_wrapper(std::current_exception(), e2));
- } catch (...) {
- pm->setException(exception_wrapper(std::current_exception()));
- }
- } else {
- pm->setTry(std::move(t));
- }
- });
+ this->setCallback_(
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T> t) mutable {
+ if (t.hasException()) {
+ auto tf2 = state.tryInvoke(std::move(t.exception()));
+ if (tf2.hasException()) {
+ state.setException(std::move(tf2.exception()));
+ } else {
+ tf2->setCallback_([p = state.stealPromise()](Try<T> && t3) mutable {
+ p.setTry(std::move(t3));
+ });
+ }
+ } else {
+ state.setTry(std::move(t));
+ }
+ });
return f;
}
template <class T>
template <class F>
typename std::enable_if<
- detail::callableWith<F, exception_wrapper>::value &&
- !detail::Extract<F>::ReturnsFuture::value,
- Future<T>>::type
+ futures::detail::callableWith<F, exception_wrapper>::value &&
+ !futures::detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
Future<T>::onError(F&& func) {
static_assert(
- std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
+ std::is_same<typename futures::detail::Extract<F>::Return, Future<T>>::
+ value,
"Return type of onError callback must be T or Future<T>");
Promise<T> p;
auto f = p.getFuture();
- auto pm = folly::makeMoveWrapper(std::move(p));
- auto funcm = folly::makeMoveWrapper(std::move(func));
- setCallback_([pm, funcm](Try<T> t) mutable {
- if (t.hasException()) {
- pm->setWith([&]{
- return (*funcm)(std::move(t.exception()));
+ this->setCallback_(
+ [state = futures::detail::makeCoreCallbackState(
+ std::move(p), std::forward<F>(func))](Try<T>&& t) mutable {
+ if (t.hasException()) {
+ state.setTry(makeTryWith(
+ [&] { return state.invoke(std::move(t.exception())); }));
+ } else {
+ state.setTry(std::move(t));
+ }
});
- } else {
- pm->setTry(std::move(t));
- }
- });
return f;
}
template <class T>
-typename std::add_lvalue_reference<T>::type Future<T>::value() {
- throwIfInvalid();
-
- return core_->getTry().value();
+Try<T>& Future<T>::getTryVia(DrivableExecutor* e) {
+ return waitVia(e).getTry();
}
-template <class T>
-typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
- throwIfInvalid();
-
- return core_->getTry().value();
+template <class Func>
+auto via(Executor* x, Func&& func)
+ -> Future<typename isFuture<decltype(std::declval<Func>()())>::Inner> {
+ // TODO make this actually more performant. :-P #7260175
+ return via(x).then(std::forward<Func>(func));
}
template <class T>
-Try<T>& Future<T>::getTry() {
- throwIfInvalid();
-
- return core_->getTry();
-}
-
-template <class T>
-Optional<Try<T>> Future<T>::poll() {
- Optional<Try<T>> o;
- if (core_->ready()) {
- o = std::move(core_->getTry());
- }
- return o;
-}
-
-template <class T>
-template <typename Executor>
-inline Future<T> Future<T>::via(Executor* executor) && {
- throwIfInvalid();
-
- setExecutor(executor);
-
- return std::move(*this);
-}
-
-template <class T>
-template <typename Executor>
-inline Future<T> Future<T>::via(Executor* executor) & {
- throwIfInvalid();
-
- MoveWrapper<Promise<T>> p;
- auto f = p->getFuture();
- then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
- return std::move(f).via(executor);
-}
-
-template <class T>
-bool Future<T>::isReady() const {
- throwIfInvalid();
- return core_->ready();
-}
-
-template <class T>
-void Future<T>::raise(exception_wrapper exception) {
- core_->raise(std::move(exception));
-}
+Future<T>::Future(futures::detail::EmptyConstruct) noexcept
+ : SemiFuture<T>(futures::detail::EmptyConstruct{}) {}
// makeFuture
template <class T>
Future<typename std::decay<T>::type> makeFuture(T&& t) {
- Promise<typename std::decay<T>::type> p;
- p.setValue(std::forward<T>(t));
- return p.getFuture();
+ return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
}
-inline // for multiple translation units
-Future<void> makeFuture() {
- Promise<void> p;
- p.setValue();
- return p.getFuture();
+inline Future<Unit> makeFuture() {
+ return makeFuture(Unit{});
}
+// makeFutureWith(Future<T>()) -> Future<T>
template <class F>
-auto makeFutureWith(
- F&& func,
- typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
- -> Future<decltype(func())> {
- Promise<decltype(func())> p;
- p.setWith(
- [&func]() {
- return (func)();
- });
- return p.getFuture();
+typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
+ typename std::result_of<F()>::type>::type
+makeFutureWith(F&& func) {
+ using InnerType =
+ typename isFuture<typename std::result_of<F()>::type>::Inner;
+ try {
+ return std::forward<F>(func)();
+ } catch (std::exception& e) {
+ return makeFuture<InnerType>(
+ exception_wrapper(std::current_exception(), e));
+ } catch (...) {
+ return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
+ }
}
+// makeFutureWith(T()) -> Future<T>
+// makeFutureWith(void()) -> Future<Unit>
template <class F>
-auto makeFutureWith(F const& func) -> Future<decltype(func())> {
- F copy = func;
- return makeFutureWith(std::move(copy));
+typename std::enable_if<
+ !(isFuture<typename std::result_of<F()>::type>::value),
+ Future<Unit::LiftT<typename std::result_of<F()>::type>>>::type
+makeFutureWith(F&& func) {
+ using LiftedResult = Unit::LiftT<typename std::result_of<F()>::type>;
+ return makeFuture<LiftedResult>(
+ makeTryWith([&func]() mutable { return std::forward<F>(func)(); }));
}
template <class T>
Future<T> makeFuture(std::exception_ptr const& e) {
- Promise<T> p;
- p.setException(e);
- return p.getFuture();
+ return makeFuture(Try<T>(e));
}
template <class T>
Future<T> makeFuture(exception_wrapper ew) {
- Promise<T> p;
- p.setException(std::move(ew));
- return p.getFuture();
+ return makeFuture(Try<T>(std::move(ew)));
}
template <class T, class E>
typename std::enable_if<std::is_base_of<std::exception, E>::value,
Future<T>>::type
makeFuture(E const& e) {
- Promise<T> p;
- p.setException(make_exception_wrapper<E>(e));
- return p.getFuture();
+ return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
}
template <class T>
Future<T> makeFuture(Try<T>&& t) {
- Promise<typename std::decay<T>::type> p;
- p.setTry(std::move(t));
- return p.getFuture();
+ return Future<T>(new futures::detail::Core<T>(std::move(t)));
}
-template <>
-inline Future<void> makeFuture(Try<void>&& t) {
- if (t.hasException()) {
- return makeFuture<void>(std::move(t.exception()));
- } else {
- return makeFuture();
- }
+// via
+Future<Unit> via(Executor* executor, int8_t priority) {
+ return makeFuture().via(executor, priority);
}
-// via
-template <typename Executor>
-Future<void> via(Executor* executor) {
- return makeFuture().via(executor);
+// mapSetCallback calls func(i, Try<T>) when every future completes
+
+template <class T, class InputIterator, class F>
+void mapSetCallback(InputIterator first, InputIterator last, F func) {
+ for (size_t i = 0; first != last; ++first, ++i) {
+ first->setCallback_([func, i](Try<T>&& t) {
+ func(i, std::move(t));
+ });
+ }
}
-// when (variadic)
+// collectAll (variadic)
template <typename... Fs>
-typename detail::VariadicContext<
- typename std::decay<Fs>::type::value_type...>::type
+typename futures::detail::CollectAllVariadicContext<
+ typename std::decay<Fs>::type::value_type...>::type
collectAll(Fs&&... fs) {
- auto ctx =
- new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
- ctx->total = sizeof...(fs);
- auto f_saved = ctx->p.getFuture();
- detail::collectAllVariadicHelper(ctx,
- std::forward<typename std::decay<Fs>::type>(fs)...);
- return f_saved;
+ auto ctx = std::make_shared<futures::detail::CollectAllVariadicContext<
+ typename std::decay<Fs>::type::value_type...>>();
+ futures::detail::collectVariadicHelper<
+ futures::detail::CollectAllVariadicContext>(ctx, std::forward<Fs>(fs)...);
+ return ctx->p.getFuture();
}
-// when (iterator)
+// collectAll (iterator)
template <class InputIterator>
Future<
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
- if (first >= last) {
- return makeFuture(std::vector<Try<T>>());
- }
- size_t n = std::distance(first, last);
-
- auto ctx = new detail::WhenAllContext<T>();
-
- ctx->results.resize(n);
-
- auto f_saved = ctx->p.getFuture();
-
- for (size_t i = 0; first != last; ++first, ++i) {
- assert(i < n);
- auto& f = *first;
- f.setCallback_([ctx, i, n](Try<T> t) {
- ctx->results[i] = std::move(t);
- if (++ctx->count == n) {
- ctx->p.setValue(std::move(ctx->results));
- delete ctx;
- }
- });
- }
+ struct CollectAllContext {
+ CollectAllContext(size_t n) : results(n) {}
+ ~CollectAllContext() {
+ p.setValue(std::move(results));
+ }
+ Promise<std::vector<Try<T>>> p;
+ std::vector<Try<T>> results;
+ };
- return f_saved;
+ auto ctx =
+ std::make_shared<CollectAllContext>(size_t(std::distance(first, last)));
+ mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+ ctx->results[i] = std::move(t);
+ });
+ return ctx->p.getFuture();
}
-namespace detail {
-
-template <class, class, typename = void> struct CollectContextHelper;
-
-template <class T, class VecT>
-struct CollectContextHelper<T, VecT,
- typename std::enable_if<std::is_same<T, VecT>::value>::type> {
- static inline std::vector<T>&& getResults(std::vector<VecT>& results) {
- return std::move(results);
- }
-};
+// collect (iterator)
-template <class T, class VecT>
-struct CollectContextHelper<T, VecT,
- typename std::enable_if<!std::is_same<T, VecT>::value>::type> {
- static inline std::vector<T> getResults(std::vector<VecT>& results) {
- std::vector<T> finalResults;
- finalResults.reserve(results.size());
- for (auto& opt : results) {
- finalResults.push_back(std::move(opt.value()));
- }
- return finalResults;
- }
-};
+namespace futures {
+namespace detail {
template <typename T>
struct CollectContext {
+ struct Nothing {
+ explicit Nothing(int /* n */) {}
+ };
- typedef typename std::conditional<
- std::is_default_constructible<T>::value,
- T,
- Optional<T>
- >::type VecT;
-
- explicit CollectContext(int n) : count(0), success_count(0), threw(false) {
- results.resize(n);
- }
-
- Promise<std::vector<T>> p;
- std::vector<VecT> results;
- std::atomic<size_t> count, success_count;
- std::atomic_bool threw;
-
- typedef std::vector<T> result_type;
-
- static inline Future<std::vector<T>> makeEmptyFuture() {
- return makeFuture(std::vector<T>());
- }
-
- inline void setValue() {
- p.setValue(CollectContextHelper<T, VecT>::getResults(results));
- }
-
- inline void addResult(int i, Try<T>& t) {
- results[i] = std::move(t.value());
- }
-};
-
-template <>
-struct CollectContext<void> {
-
- explicit CollectContext(int n) : count(0), success_count(0), threw(false) {}
-
- Promise<void> p;
- std::atomic<size_t> count, success_count;
- std::atomic_bool threw;
-
- typedef void result_type;
-
- static inline Future<void> makeEmptyFuture() {
- return makeFuture();
- }
-
- inline void setValue() {
- p.setValue();
+ using Result = typename std::conditional<
+ std::is_void<T>::value,
+ void,
+ std::vector<T>>::type;
+
+ using InternalResult = typename std::conditional<
+ std::is_void<T>::value,
+ Nothing,
+ std::vector<Optional<T>>>::type;
+
+ explicit CollectContext(size_t n) : result(n) {}
+ ~CollectContext() {
+ if (!threw.exchange(true)) {
+ // map Optional<T> -> T
+ std::vector<T> finalResult;
+ finalResult.reserve(result.size());
+ std::transform(result.begin(), result.end(),
+ std::back_inserter(finalResult),
+ [](Optional<T>& o) { return std::move(o.value()); });
+ p.setValue(std::move(finalResult));
+ }
}
-
- inline void addResult(int i, Try<void>& t) {
- // do nothing
+ inline void setPartialResult(size_t i, Try<T>& t) {
+ result[i] = std::move(t.value());
}
+ Promise<Result> p;
+ InternalResult result;
+ std::atomic<bool> threw {false};
};
-} // detail
+} // namespace detail
+} // namespace futures
template <class InputIterator>
-Future<typename detail::CollectContext<
- typename std::iterator_traits<InputIterator>::value_type::value_type
->::result_type>
+Future<typename futures::detail::CollectContext<typename std::iterator_traits<
+ InputIterator>::value_type::value_type>::Result>
collect(InputIterator first, InputIterator last) {
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
- if (first >= last) {
- return detail::CollectContext<T>::makeEmptyFuture();
- }
-
- size_t n = std::distance(first, last);
- auto ctx = new detail::CollectContext<T>(n);
- auto f_saved = ctx->p.getFuture();
-
- for (size_t i = 0; first != last; ++first, ++i) {
- assert(i < n);
- auto& f = *first;
- f.setCallback_([ctx, i, n](Try<T> t) {
-
- if (t.hasException()) {
- if (!ctx->threw.exchange(true)) {
- ctx->p.setException(std::move(t.exception()));
- }
- } else if (!ctx->threw) {
- ctx->addResult(i, t);
- if (++ctx->success_count == n) {
- ctx->setValue();
- }
+ auto ctx = std::make_shared<futures::detail::CollectContext<T>>(
+ std::distance(first, last));
+ mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+ if (t.hasException()) {
+ if (!ctx->threw.exchange(true)) {
+ ctx->p.setException(std::move(t.exception()));
}
+ } else if (!ctx->threw) {
+ ctx->setPartialResult(i, t);
+ }
+ });
+ return ctx->p.getFuture();
+}
- if (++ctx->count == n) {
- delete ctx;
- }
- });
- }
+// collect (variadic)
- return f_saved;
+template <typename... Fs>
+typename futures::detail::CollectVariadicContext<
+ typename std::decay<Fs>::type::value_type...>::type
+collect(Fs&&... fs) {
+ auto ctx = std::make_shared<futures::detail::CollectVariadicContext<
+ typename std::decay<Fs>::type::value_type...>>();
+ futures::detail::collectVariadicHelper<
+ futures::detail::CollectVariadicContext>(ctx, std::forward<Fs>(fs)...);
+ return ctx->p.getFuture();
}
+// collectAny (iterator)
+
template <class InputIterator>
Future<
std::pair<size_t,
Try<
typename
- std::iterator_traits<InputIterator>::value_type::value_type> > >
+ std::iterator_traits<InputIterator>::value_type::value_type>>>
collectAny(InputIterator first, InputIterator last) {
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
- auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
- auto f_saved = ctx->p.getFuture();
+ struct CollectAnyContext {
+ CollectAnyContext() {}
+ Promise<std::pair<size_t, Try<T>>> p;
+ std::atomic<bool> done {false};
+ };
- for (size_t i = 0; first != last; first++, i++) {
- auto& f = *first;
- f.setCallback_([i, ctx](Try<T>&& t) {
- if (!ctx->done.exchange(true)) {
- ctx->p.setValue(std::make_pair(i, std::move(t)));
- }
- ctx->decref();
- });
- }
+ auto ctx = std::make_shared<CollectAnyContext>();
+ mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+ if (!ctx->done.exchange(true)) {
+ ctx->p.setValue(std::make_pair(i, std::move(t)));
+ }
+ });
+ return ctx->p.getFuture();
+}
+
+// collectAnyWithoutException (iterator)
+
+template <class InputIterator>
+Future<std::pair<
+ size_t,
+ typename std::iterator_traits<InputIterator>::value_type::value_type>>
+collectAnyWithoutException(InputIterator first, InputIterator last) {
+ typedef
+ typename std::iterator_traits<InputIterator>::value_type::value_type T;
+
+ struct CollectAnyWithoutExceptionContext {
+ CollectAnyWithoutExceptionContext(){}
+ Promise<std::pair<size_t, T>> p;
+ std::atomic<bool> done{false};
+ std::atomic<size_t> nFulfilled{0};
+ size_t nTotal;
+ };
- return f_saved;
+ auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
+ ctx->nTotal = size_t(std::distance(first, last));
+
+ mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+ if (!t.hasException() && !ctx->done.exchange(true)) {
+ ctx->p.setValue(std::make_pair(i, std::move(t.value())));
+ } else if (++ctx->nFulfilled == ctx->nTotal) {
+ ctx->p.setException(t.exception());
+ }
+ });
+ return ctx->p.getFuture();
}
+// collectN (iterator)
+
template <class InputIterator>
Future<std::vector<std::pair<size_t, Try<typename
std::iterator_traits<InputIterator>::value_type::value_type>>>>
std::iterator_traits<InputIterator>::value_type::value_type T;
typedef std::vector<std::pair<size_t, Try<T>>> V;
- struct ctx_t {
+ struct CollectNContext {
V v;
- size_t completed;
+ std::atomic<size_t> completed = {0};
Promise<V> p;
};
- auto ctx = std::make_shared<ctx_t>();
- ctx->completed = 0;
-
- // for each completed Future, increase count and add to vector, until we
- // have n completed futures at which point we fulfill our Promise with the
- // vector
- auto it = first;
- size_t i = 0;
- while (it != last) {
- it->then([ctx, n, i](Try<T>&& t) {
- auto& v = ctx->v;
+ auto ctx = std::make_shared<CollectNContext>();
+
+ if (size_t(std::distance(first, last)) < n) {
+ ctx->p.setException(std::runtime_error("Not enough futures"));
+ } else {
+ // for each completed Future, increase count and add to vector, until we
+ // have n completed futures at which point we fulfil our Promise with the
+ // vector
+ mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
auto c = ++ctx->completed;
if (c <= n) {
assert(ctx->v.size() < n);
- v.push_back(std::make_pair(i, std::move(t)));
+ ctx->v.emplace_back(i, std::move(t));
if (c == n) {
- ctx->p.setTry(Try<V>(std::move(v)));
+ ctx->p.setTry(Try<V>(std::move(ctx->v)));
}
}
});
-
- it++;
- i++;
- }
-
- if (i < n) {
- ctx->p.setException(std::runtime_error("Not enough futures"));
}
return ctx->p.getFuture();
}
-template <class It, class T, class F, class ItT, class Arg>
-typename std::enable_if<!isFutureResult<F, T, Arg>::value, Future<T>>::type
-reduce(It first, It last, T initial, F func) {
+// reduce (iterator)
+
+template <class It, class T, class F>
+Future<T> reduce(It first, It last, T&& initial, F&& func) {
if (first == last) {
return makeFuture(std::move(initial));
}
+ typedef typename std::iterator_traits<It>::value_type::value_type ItT;
+ typedef typename std::conditional<
+ futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
+ Try<ItT>,
+ ItT>::type Arg;
typedef isTry<Arg> IsTry;
- return collectAll(first, last)
- .then([initial, func](std::vector<Try<ItT>>& vals) mutable {
- for (auto& val : vals) {
- initial = func(std::move(initial),
- // Either return a ItT&& or a Try<ItT>&& depending
- // on the type of the argument of func.
- val.template get<IsTry::value, Arg&&>());
- }
- return initial;
+ auto sfunc = std::make_shared<F>(std::move(func));
+
+ auto f = first->then(
+ [ minitial = std::move(initial), sfunc ](Try<ItT> & head) mutable {
+ return (*sfunc)(
+ std::move(minitial), head.template get<IsTry::value, Arg&&>());
+ });
+
+ for (++first; first != last; ++first) {
+ f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
+ return (*sfunc)(std::move(std::get<0>(t).value()),
+ // Either return a ItT&& or a Try<ItT>&& depending
+ // on the type of the argument of func.
+ std::get<1>(t).template get<IsTry::value, Arg&&>());
});
+ }
+
+ return f;
+}
+
+// window (collection)
+
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Collection input, F func, size_t n) {
+ // Use global inline executor singleton
+ auto executor = &InlineExecutor::instance();
+ return window(executor, std::move(input), std::move(func), n);
+}
+
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Executor* executor, Collection input, F func, size_t n) {
+ struct WindowContext {
+ WindowContext(Executor* executor_, Collection&& input_, F&& func_)
+ : executor(executor_),
+ input(std::move(input_)),
+ promises(input.size()),
+ func(std::move(func_)) {}
+ std::atomic<size_t> i{0};
+ Executor* executor;
+ Collection input;
+ std::vector<Promise<Result>> promises;
+ F func;
+
+ static inline void spawn(std::shared_ptr<WindowContext> ctx) {
+ size_t i = ctx->i++;
+ if (i < ctx->input.size()) {
+ auto fut = ctx->func(std::move(ctx->input[i]));
+ fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
+ const auto executor_ = ctx->executor;
+ executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
+ ctx->promises[i].setTry(std::move(t));
+ // Chain another future onto this one
+ spawn(std::move(ctx));
+ });
+ });
+ }
+ }
+ };
+
+ auto max = std::min(n, input.size());
+
+ auto ctx = std::make_shared<WindowContext>(
+ executor, std::move(input), std::move(func));
+
+ // Start the first n Futures
+ for (size_t i = 0; i < max; ++i) {
+ executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
+ }
+
+ std::vector<Future<Result>> futures;
+ futures.reserve(ctx->promises.size());
+ for (auto& promise : ctx->promises) {
+ futures.emplace_back(promise.getFuture());
+ }
+
+ return futures;
+}
+
+// reduce
+
+template <class T>
+template <class I, class F>
+Future<I> Future<T>::reduce(I&& initial, F&& func) {
+ return then([
+ minitial = std::forward<I>(initial),
+ mfunc = std::forward<F>(func)
+ ](T& vals) mutable {
+ auto ret = std::move(minitial);
+ for (auto& val : vals) {
+ ret = mfunc(std::move(ret), std::move(val));
+ }
+ return ret;
+ });
}
+// unorderedReduce (iterator)
+
template <class It, class T, class F, class ItT, class Arg>
-typename std::enable_if<isFutureResult<F, T, Arg>::value, Future<T>>::type
-reduce(It first, It last, T initial, F func) {
+Future<T> unorderedReduce(It first, It last, T initial, F func) {
if (first == last) {
return makeFuture(std::move(initial));
}
typedef isTry<Arg> IsTry;
- auto f = first->then([initial, func](Try<ItT>& head) mutable {
- return func(std::move(initial),
- head.template get<IsTry::value, Arg&&>());
- });
+ struct UnorderedReduceContext {
+ UnorderedReduceContext(T&& memo, F&& fn, size_t n)
+ : lock_(), memo_(makeFuture<T>(std::move(memo))),
+ func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
+ {}
+ folly::MicroSpinLock lock_; // protects memo_ and numThens_
+ Future<T> memo_;
+ F func_;
+ size_t numThens_; // how many Futures completed and called .then()
+ size_t numFutures_; // how many Futures in total
+ Promise<T> promise_;
+ };
- for (++first; first != last; ++first) {
- f = collectAll(f, *first).then([func](std::tuple<Try<T>, Try<ItT>>& t) {
- return func(std::move(std::get<0>(t).value()),
- // Either return a ItT&& or a Try<ItT>&& depending
- // on the type of the argument of func.
- std::get<1>(t).template get<IsTry::value, Arg&&>());
- });
- }
+ auto ctx = std::make_shared<UnorderedReduceContext>(
+ std::move(initial), std::move(func), std::distance(first, last));
+
+ mapSetCallback<ItT>(
+ first,
+ last,
+ [ctx](size_t /* i */, Try<ItT>&& t) {
+ // Futures can be completed in any order, simultaneously.
+ // To make this non-blocking, we create a new Future chain in
+ // the order of completion to reduce the values.
+ // The spinlock just protects chaining a new Future, not actually
+ // executing the reduce, which should be really fast.
+ folly::MSLGuard lock(ctx->lock_);
+ ctx->memo_ =
+ ctx->memo_.then([ ctx, mt = std::move(t) ](T && v) mutable {
+ // Either return a ItT&& or a Try<ItT>&& depending
+ // on the type of the argument of func.
+ return ctx->func_(std::move(v),
+ mt.template get<IsTry::value, Arg&&>());
+ });
+ if (++ctx->numThens_ == ctx->numFutures_) {
+ // After reducing the value of the last Future, fulfill the Promise
+ ctx->memo_.setCallback_(
+ [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
+ }
+ });
- return f;
+ return ctx->promise_.getFuture();
}
+// within
+
template <class T>
Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
return within(dur, TimedOut(), tk);
Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
struct Context {
- Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
+ Context(E ex) : exception(std::move(ex)), promise() {}
E exception;
+ Future<Unit> thisFuture;
Promise<T> promise;
- std::atomic<bool> token;
+ std::atomic<bool> token {false};
};
- auto ctx = std::make_shared<Context>(std::move(e));
+ if (this->isReady()) {
+ return std::move(*this);
+ }
+
+ std::shared_ptr<Timekeeper> tks;
if (!tk) {
- tk = folly::detail::getTimekeeperSingleton();
+ tks = folly::detail::getTimekeeperSingleton();
+ tk = DCHECK_NOTNULL(tks.get());
}
- tk->after(dur)
- .then([ctx](Try<void> const& t) {
- if (ctx->token.exchange(true) == false) {
- if (t.hasException()) {
- ctx->promise.setException(std::move(t.exception()));
- } else {
- ctx->promise.setException(std::move(ctx->exception));
- }
- }
- });
+ auto ctx = std::make_shared<Context>(std::move(e));
- this->then([ctx](Try<T>&& t) {
+ ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
if (ctx->token.exchange(true) == false) {
ctx->promise.setTry(std::move(t));
}
});
- return ctx->promise.getFuture();
+ // Have time keeper use a weak ptr to hold ctx,
+ // so that ctx can be deallocated as soon as the future job finished.
+ tk->after(dur).then([weakCtx = to_weak_ptr(ctx)](Try<Unit> const& t) mutable {
+ auto lockedCtx = weakCtx.lock();
+ if (!lockedCtx) {
+ // ctx already released. "this" completed first, cancel "after"
+ return;
+ }
+ // "after" completed first, cancel "this"
+ lockedCtx->thisFuture.raise(TimedOut());
+ if (lockedCtx->token.exchange(true) == false) {
+ if (t.hasException()) {
+ lockedCtx->promise.setException(std::move(t.exception()));
+ } else {
+ lockedCtx->promise.setException(std::move(lockedCtx->exception));
+ }
+ }
+ });
+
+ return ctx->promise.getFuture().via(this->getExecutor());
}
+// delayed
+
template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
return collectAll(*this, futures::sleep(dur, tk))
- .then([](std::tuple<Try<T>, Try<void>> tup) {
+ .then([](std::tuple<Try<T>, Try<Unit>> tup) {
Try<T>& t = std::get<0>(tup);
return makeFuture<T>(std::move(t));
});
}
+namespace futures {
namespace detail {
-template <class T>
-void waitImpl(Future<T>& f) {
+template <class FutureType, typename T = typename FutureType::value_type>
+void waitImpl(FutureType& f) {
// short-circuit if there's nothing to do
- if (f.isReady()) return;
+ if (f.isReady()) {
+ return;
+ }
- folly::fibers::Baton baton;
- f = f.then([&](Try<T> t) {
- baton.post();
- return makeFuture(std::move(t));
- });
+ FutureBatonType baton;
+ f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
baton.wait();
-
- // There's a race here between the return here and the actual finishing of
- // the future. f is completed, but the setup may not have finished on done
- // after the baton has posted.
- while (!f.isReady()) {
- std::this_thread::yield();
- }
+ assert(f.isReady());
}
-template <class T>
-void waitImpl(Future<T>& f, Duration dur) {
+template <class FutureType, typename T = typename FutureType::value_type>
+void waitImpl(FutureType& f, Duration dur) {
// short-circuit if there's nothing to do
- if (f.isReady()) return;
+ if (f.isReady()) {
+ return;
+ }
- auto baton = std::make_shared<folly::fibers::Baton>();
- f = f.then([baton](Try<T> t) {
+ Promise<T> promise;
+ auto ret = promise.getFuture();
+ auto baton = std::make_shared<FutureBatonType>();
+ f.setCallback_([ baton, promise = std::move(promise) ](Try<T> && t) mutable {
+ promise.setTry(std::move(t));
baton->post();
- return makeFuture(std::move(t));
});
-
- // Let's preserve the invariant that if we did not timeout (timed_wait returns
- // true), then the returned Future is complete when it is returned to the
- // caller. We need to wait out the race for that Future to complete.
+ f = std::move(ret);
if (baton->timed_wait(dur)) {
- while (!f.isReady()) {
- std::this_thread::yield();
- }
+ assert(f.isReady());
}
}
template <class T>
void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
+ // Set callback so to ensure that the via executor has something on it
+ // so that once the preceding future triggers this callback, drive will
+ // always have a callback to satisfy it
+ if (f.isReady()) {
+ return;
+ }
+ f = f.via(e).then([](T&& t) { return std::move(t); });
while (!f.isReady()) {
e->drive();
}
+ assert(f.isReady());
}
-} // detail
+} // namespace detail
+} // namespace futures
template <class T>
-Future<T>& Future<T>::wait() & {
- detail::waitImpl(*this);
+SemiFuture<T>& SemiFuture<T>::wait() & {
+ futures::detail::waitImpl(*this);
return *this;
}
template <class T>
-Future<T>&& Future<T>::wait() && {
- detail::waitImpl(*this);
+SemiFuture<T>&& SemiFuture<T>::wait() && {
+ futures::detail::waitImpl(*this);
return std::move(*this);
}
template <class T>
-Future<T>& Future<T>::wait(Duration dur) & {
- detail::waitImpl(*this, dur);
+SemiFuture<T>& SemiFuture<T>::wait(Duration dur) & {
+ futures::detail::waitImpl(*this, dur);
return *this;
}
template <class T>
-Future<T>&& Future<T>::wait(Duration dur) && {
- detail::waitImpl(*this, dur);
+SemiFuture<T>&& SemiFuture<T>::wait(Duration dur) && {
+ futures::detail::waitImpl(*this, dur);
return std::move(*this);
}
template <class T>
-Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
- detail::waitViaImpl(*this, e);
+T SemiFuture<T>::get() {
+ return std::move(wait().value());
+}
+
+template <class T>
+T SemiFuture<T>::get(Duration dur) {
+ wait(dur);
+ if (this->isReady()) {
+ return std::move(this->value());
+ } else {
+ throwTimedOut();
+ }
+}
+
+template <class T>
+Future<T>& Future<T>::wait() & {
+ futures::detail::waitImpl(*this);
return *this;
}
template <class T>
-Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
- detail::waitViaImpl(*this, e);
+Future<T>&& Future<T>::wait() && {
+ futures::detail::waitImpl(*this);
return std::move(*this);
}
template <class T>
-T Future<T>::get() {
- return std::move(wait().value());
+Future<T>& Future<T>::wait(Duration dur) & {
+ futures::detail::waitImpl(*this, dur);
+ return *this;
}
-template <>
-inline void Future<void>::get() {
- wait().value();
+template <class T>
+Future<T>&& Future<T>::wait(Duration dur) && {
+ futures::detail::waitImpl(*this, dur);
+ return std::move(*this);
}
template <class T>
-T Future<T>::get(Duration dur) {
- wait(dur);
- if (isReady()) {
- return std::move(value());
- } else {
- throw TimedOut();
- }
+Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
+ futures::detail::waitViaImpl(*this, e);
+ return *this;
}
-template <>
-inline void Future<void>::get(Duration dur) {
- wait(dur);
- if (isReady()) {
- return;
- } else {
- throw TimedOut();
- }
+template <class T>
+Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
+ futures::detail::waitViaImpl(*this, e);
+ return std::move(*this);
}
template <class T>
return std::move(waitVia(e).value());
}
-template <>
-inline void Future<void>::getVia(DrivableExecutor* e) {
- waitVia(e).value();
-}
+namespace futures {
+namespace detail {
+template <class T>
+struct TryEquals {
+ static bool equals(const Try<T>& t1, const Try<T>& t2) {
+ return t1.value() == t2.value();
+ }
+};
+} // namespace detail
+} // namespace futures
template <class T>
Future<bool> Future<T>::willEqual(Future<T>& f) {
return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
- return std::get<0>(t).value() == std::get<1>(t).value();
+ return futures::detail::TryEquals<T>::equals(
+ std::get<0>(t), std::get<1>(t));
} else {
return false;
}
template <class T>
template <class F>
-Future<T> Future<T>::filter(F predicate) {
- auto p = folly::makeMoveWrapper(std::move(predicate));
- return this->then([p](T val) {
+Future<T> Future<T>::filter(F&& predicate) {
+ return this->then([p = std::forward<F>(predicate)](T val) {
T const& valConstRef = val;
- if (!(*p)(valConstRef)) {
- throw PredicateDoesNotObtain();
+ if (!p(valConstRef)) {
+ throwPredicateDoesNotObtain();
}
return val;
});
}
-namespace futures {
- namespace {
- template <class Z>
- Future<Z> chainHelper(Future<Z> f) {
- return f;
- }
-
- template <class Z, class F, class Fn, class... Callbacks>
- Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
- return chainHelper<Z>(f.then(fn), fns...);
- }
+template <class F>
+inline Future<Unit> when(bool p, F&& thunk) {
+ return p ? std::forward<F>(thunk)().unit() : makeFuture();
+}
+
+template <class P, class F>
+Future<Unit> whileDo(P&& predicate, F&& thunk) {
+ if (predicate()) {
+ auto future = thunk();
+ return future.then([
+ predicate = std::forward<P>(predicate),
+ thunk = std::forward<F>(thunk)
+ ]() mutable {
+ return whileDo(std::forward<P>(predicate), std::forward<F>(thunk));
+ });
}
+ return makeFuture();
+}
- template <class A, class Z, class... Callbacks>
- std::function<Future<Z>(Try<A>)>
- chain(Callbacks... fns) {
- MoveWrapper<Promise<A>> pw;
- MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
- return [=](Try<A> t) mutable {
- pw->setTry(std::move(t));
- return std::move(*fw);
- };
- }
+template <class F>
+Future<Unit> times(const int n, F&& thunk) {
+ return folly::whileDo(
+ [ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
+ return count->fetch_add(1) < n;
+ },
+ std::forward<F>(thunk));
+}
+namespace futures {
template <class It, class F, class ItT, class Result>
std::vector<Future<Result>> map(It first, It last, F func) {
std::vector<Future<Result>> results;
}
}
+// Instantiate the most common Future types to save compile time
+extern template class Future<Unit>;
+extern template class Future<bool>;
+extern template class Future<int>;
+extern template class Future<int64_t>;
+extern template class Future<std::string>;
+extern template class Future<double>;
} // namespace folly
-
-// I haven't included a Future<T&> specialization because I don't forsee us
-// using it, however it is not difficult to add when needed. Refer to
-// Future<void> for guidance. std::future and boost::future code would also be
-// instructive.