/*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#pragma once
+#include <algorithm>
+#include <cassert>
#include <chrono>
+#include <random>
#include <thread>
-
#include <folly/Baton.h>
+#include <folly/Optional.h>
+#include <folly/Random.h>
+#include <folly/Traits.h>
#include <folly/futures/detail/Core.h>
#include <folly/futures/Timekeeper.h>
+#if defined(__ANDROID__) || defined(__APPLE__)
+#define FOLLY_FUTURE_USING_FIBER 0
+#else
+#define FOLLY_FUTURE_USING_FIBER 1
+#include <folly/experimental/fibers/Baton.h>
+#endif
+
namespace folly {
class Timekeeper;
namespace detail {
- Timekeeper* getTimekeeperSingleton();
+#if FOLLY_FUTURE_USING_FIBER
+typedef folly::fibers::Baton FutureBatonType;
+#else
+typedef folly::Baton<> FutureBatonType;
+#endif
+}
+
+namespace detail {
+ std::shared_ptr<Timekeeper> getTimekeeperSingleton();
}
template <class T>
-Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
- *this = std::move(other);
+Future<T>::Future(Future<T>&& other) noexcept : core_(other.core_) {
+ other.core_ = nullptr;
}
template <class T>
-Future<T>& Future<T>::operator=(Future<T>&& other) {
+Future<T>& Future<T>::operator=(Future<T>&& other) noexcept {
std::swap(core_, other.core_);
return *this;
}
template <class T>
-template <class F>
-Future<T>::Future(
- const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
- : core_(nullptr) {
- Promise<F> p;
- p.setValue(val);
- *this = p.getFuture();
-}
+template <class T2, typename>
+Future<T>::Future(T2&& val)
+ : core_(new detail::Core<T>(Try<T>(std::forward<T2>(val)))) {}
template <class T>
-template <class F>
-Future<T>::Future(
- typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
- : core_(nullptr) {
- Promise<F> p;
- p.setValue(std::forward<F>(val));
- *this = p.getFuture();
-}
-
-template <>
-template <class F,
- typename std::enable_if<std::is_void<F>::value, int>::type>
-Future<void>::Future() : core_(nullptr) {
- Promise<void> p;
- p.setValue();
- *this = p.getFuture();
-}
-
+template <typename, typename>
+Future<T>::Future()
+ : core_(new detail::Core<T>(Try<T>(T()))) {}
template <class T>
Future<T>::~Future() {
core_->setCallback(std::move(func));
}
+// unwrap
+
+template <class T>
+template <class F>
+typename std::enable_if<isFuture<F>::value,
+ Future<typename isFuture<T>::Inner>>::type
+Future<T>::unwrap() {
+ return then([](Future<typename isFuture<T>::Inner> internal_future) {
+ return internal_future;
+ });
+}
+
+// then
+
// Variant: returns a value
// e.g. f.then([](Try<T>&& t){ return t.value(); });
template <class T>
// wrap these so we can move them into the lambda
folly::MoveWrapper<Promise<B>> p;
+ p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
folly::MoveWrapper<F> funcm(std::forward<F>(func));
// grab the Future now before we lose our handle on the Promise
auto f = p->getFuture();
+ f.core_->setExecutorNoLock(getExecutor());
/* This is a bit tricky.
if (!isTry && t.hasException()) {
p->setException(std::move(t.exception()));
} else {
- p->fulfil([&]() {
+ p->setWith([&]() {
return (*funcm)(t.template get<isTry, Args>()...);
});
}
// wrap these so we can move them into the lambda
folly::MoveWrapper<Promise<B>> p;
+ p->core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
folly::MoveWrapper<F> funcm(std::forward<F>(func));
// grab the Future now before we lose our handle on the Promise
auto f = p->getFuture();
+ f.core_->setExecutorNoLock(getExecutor());
setCallback_(
[p, funcm](Try<T>&& t) mutable {
auto f2 = (*funcm)(t.template get<isTry, Args>()...);
// that didn't throw, now we can steal p
f2.setCallback_([p](Try<B>&& b) mutable {
- p->fulfilTry(std::move(b));
+ p->setTry(std::move(b));
});
} catch (const std::exception& e) {
+ p->setException(exception_wrapper(std::current_exception(), e));
+ } catch (...) {
p->setException(exception_wrapper(std::current_exception()));
}
}
}
template <class T>
-Future<void> Future<T>::then() {
- return then([] (Try<T>&& t) {});
+template <class Executor, class Arg, class... Args>
+auto Future<T>::then(Executor* x, Arg&& arg, Args&&... args)
+ -> decltype(this->then(std::forward<Arg>(arg),
+ std::forward<Args>(args)...))
+{
+ auto oldX = getExecutor();
+ setExecutor(x);
+ return this->then(std::forward<Arg>(arg), std::forward<Args>(args)...).
+ via(oldX);
+}
+
+template <class T>
+Future<Unit> Future<T>::then() {
+ return then([] () {});
}
// onError where the callback returns T
template <class T>
template <class F>
typename std::enable_if<
+ !detail::callableWith<F, exception_wrapper>::value &&
!detail::Extract<F>::ReturnsFuture::value,
Future<T>>::type
Future<T>::onError(F&& func) {
"Return type of onError callback must be T or Future<T>");
Promise<T> p;
+ p.core_->setInterruptHandlerNoLock(core_->getInterruptHandler());
auto f = p.getFuture();
auto pm = folly::makeMoveWrapper(std::move(p));
auto funcm = folly::makeMoveWrapper(std::move(func));
setCallback_([pm, funcm](Try<T>&& t) mutable {
if (!t.template withException<Exn>([&] (Exn& e) {
- pm->fulfil([&]{
+ pm->setWith([&]{
return (*funcm)(e);
});
})) {
- pm->fulfilTry(std::move(t));
+ pm->setTry(std::move(t));
}
});
template <class T>
template <class F>
typename std::enable_if<
+ !detail::callableWith<F, exception_wrapper>::value &&
detail::Extract<F>::ReturnsFuture::value,
Future<T>>::type
Future<T>::onError(F&& func) {
try {
auto f2 = (*funcm)(e);
f2.setCallback_([pm](Try<T>&& t2) mutable {
- pm->fulfilTry(std::move(t2));
+ pm->setTry(std::move(t2));
});
} catch (const std::exception& e2) {
pm->setException(exception_wrapper(std::current_exception(), e2));
pm->setException(exception_wrapper(std::current_exception()));
}
})) {
- pm->fulfilTry(std::move(t));
+ pm->setTry(std::move(t));
}
});
return f;
}
+template <class T>
+template <class F>
+Future<T> Future<T>::ensure(F func) {
+ MoveWrapper<F> funcw(std::move(func));
+ return this->then([funcw](Try<T>&& t) mutable {
+ (*funcw)();
+ return makeFuture(std::move(t));
+ });
+}
+
template <class T>
template <class F>
Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
.onError([funcw](TimedOut const&) { return (*funcw)(); });
}
+template <class T>
+template <class F>
+typename std::enable_if<
+ detail::callableWith<F, exception_wrapper>::value &&
+ detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
+Future<T>::onError(F&& func) {
+ static_assert(
+ std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
+ "Return type of onError callback must be T or Future<T>");
+
+ Promise<T> p;
+ auto f = p.getFuture();
+ auto pm = folly::makeMoveWrapper(std::move(p));
+ auto funcm = folly::makeMoveWrapper(std::move(func));
+ setCallback_([pm, funcm](Try<T> t) mutable {
+ if (t.hasException()) {
+ try {
+ auto f2 = (*funcm)(std::move(t.exception()));
+ f2.setCallback_([pm](Try<T> t2) mutable {
+ pm->setTry(std::move(t2));
+ });
+ } catch (const std::exception& e2) {
+ pm->setException(exception_wrapper(std::current_exception(), e2));
+ } catch (...) {
+ pm->setException(exception_wrapper(std::current_exception()));
+ }
+ } else {
+ pm->setTry(std::move(t));
+ }
+ });
+
+ return f;
+}
+
+// onError(exception_wrapper) that returns T
+template <class T>
+template <class F>
+typename std::enable_if<
+ detail::callableWith<F, exception_wrapper>::value &&
+ !detail::Extract<F>::ReturnsFuture::value,
+ Future<T>>::type
+Future<T>::onError(F&& func) {
+ static_assert(
+ std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
+ "Return type of onError callback must be T or Future<T>");
+
+ Promise<T> p;
+ auto f = p.getFuture();
+ auto pm = folly::makeMoveWrapper(std::move(p));
+ auto funcm = folly::makeMoveWrapper(std::move(func));
+ setCallback_([pm, funcm](Try<T> t) mutable {
+ if (t.hasException()) {
+ pm->setWith([&]{
+ return (*funcm)(std::move(t.exception()));
+ });
+ } else {
+ pm->setTry(std::move(t));
+ }
+ });
+
+ return f;
+}
+
template <class T>
typename std::add_lvalue_reference<T>::type Future<T>::value() {
throwIfInvalid();
}
template <class T>
-template <typename Executor>
-inline Future<T> Future<T>::via(Executor* executor) && {
+Optional<Try<T>> Future<T>::poll() {
+ Optional<Try<T>> o;
+ if (core_->ready()) {
+ o = std::move(core_->getTry());
+ }
+ return o;
+}
+
+template <class T>
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
throwIfInvalid();
- this->deactivate();
- core_->setExecutor(executor);
+ setExecutor(executor, priority);
return std::move(*this);
}
template <class T>
-template <typename Executor>
-inline Future<T> Future<T>::via(Executor* executor) & {
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
throwIfInvalid();
MoveWrapper<Promise<T>> p;
auto f = p->getFuture();
- then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
- return std::move(f).via(executor);
+ then([p](Try<T>&& t) mutable { p->setTry(std::move(t)); });
+ return std::move(f).via(executor, priority);
+}
+
+
+template <class Func>
+auto via(Executor* x, Func func)
+ -> Future<typename isFuture<decltype(func())>::Inner>
+{
+ // TODO make this actually more performant. :-P #7260175
+ return via(x).then(func);
}
template <class T>
return core_->ready();
}
+template <class T>
+bool Future<T>::hasValue() {
+ return getTry().hasValue();
+}
+
+template <class T>
+bool Future<T>::hasException() {
+ return getTry().hasException();
+}
+
template <class T>
void Future<T>::raise(exception_wrapper exception) {
core_->raise(std::move(exception));
template <class T>
Future<typename std::decay<T>::type> makeFuture(T&& t) {
- Promise<typename std::decay<T>::type> p;
- p.setValue(std::forward<T>(t));
- return p.getFuture();
+ return makeFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
}
inline // for multiple translation units
-Future<void> makeFuture() {
- Promise<void> p;
- p.setValue();
- return p.getFuture();
+Future<Unit> makeFuture() {
+ return makeFuture(Unit{});
}
+// makeFutureWith(Future<T>()) -> Future<T>
template <class F>
-auto makeFutureTry(
- F&& func,
- typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
- -> Future<decltype(func())> {
- Promise<decltype(func())> p;
- p.fulfil(
- [&func]() {
- return (func)();
- });
- return p.getFuture();
+typename std::enable_if<isFuture<typename std::result_of<F()>::type>::value,
+ typename std::result_of<F()>::type>::type
+makeFutureWith(F&& func) {
+ using InnerType =
+ typename isFuture<typename std::result_of<F()>::type>::Inner;
+ try {
+ return func();
+ } catch (std::exception& e) {
+ return makeFuture<InnerType>(
+ exception_wrapper(std::current_exception(), e));
+ } catch (...) {
+ return makeFuture<InnerType>(exception_wrapper(std::current_exception()));
+ }
}
+// makeFutureWith(T()) -> Future<T>
+// makeFutureWith(void()) -> Future<Unit>
template <class F>
-auto makeFutureTry(F const& func) -> Future<decltype(func())> {
- F copy = func;
- return makeFutureTry(std::move(copy));
+typename std::enable_if<
+ !(isFuture<typename std::result_of<F()>::type>::value),
+ Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
+makeFutureWith(F&& func) {
+ using LiftedResult =
+ typename Unit::Lift<typename std::result_of<F()>::type>::type;
+ return makeFuture<LiftedResult>(makeTryWith([&func]() mutable {
+ return func();
+ }));
}
template <class T>
Future<T> makeFuture(std::exception_ptr const& e) {
- Promise<T> p;
- p.setException(e);
- return p.getFuture();
+ return makeFuture(Try<T>(e));
}
template <class T>
Future<T> makeFuture(exception_wrapper ew) {
- Promise<T> p;
- p.setException(std::move(ew));
- return p.getFuture();
+ return makeFuture(Try<T>(std::move(ew)));
}
template <class T, class E>
typename std::enable_if<std::is_base_of<std::exception, E>::value,
Future<T>>::type
makeFuture(E const& e) {
- Promise<T> p;
- p.setException(make_exception_wrapper<E>(e));
- return p.getFuture();
+ return makeFuture(Try<T>(make_exception_wrapper<E>(e)));
}
template <class T>
Future<T> makeFuture(Try<T>&& t) {
- Promise<typename std::decay<T>::type> p;
- p.fulfilTry(std::move(t));
- return p.getFuture();
+ return Future<T>(new detail::Core<T>(std::move(t)));
}
-template <>
-inline Future<void> makeFuture(Try<void>&& t) {
- if (t.hasException()) {
- return makeFuture<void>(std::move(t.exception()));
- } else {
- return makeFuture();
- }
+// via
+Future<Unit> via(Executor* executor, int8_t priority) {
+ return makeFuture().via(executor, priority);
}
-// via
-template <typename Executor>
-Future<void> via(Executor* executor) {
- return makeFuture().via(executor);
+// mapSetCallback calls func(i, Try<T>) when every future completes
+
+template <class T, class InputIterator, class F>
+void mapSetCallback(InputIterator first, InputIterator last, F func) {
+ for (size_t i = 0; first != last; ++first, ++i) {
+ first->setCallback_([func, i](Try<T>&& t) {
+ func(i, std::move(t));
+ });
+ }
}
-// when (variadic)
+// collectAll (variadic)
template <typename... Fs>
-typename detail::VariadicContext<
+typename detail::CollectAllVariadicContext<
typename std::decay<Fs>::type::value_type...>::type
-whenAll(Fs&&... fs)
-{
- auto ctx =
- new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
- ctx->total = sizeof...(fs);
- auto f_saved = ctx->p.getFuture();
- detail::whenAllVariadicHelper(ctx,
- std::forward<typename std::decay<Fs>::type>(fs)...);
- return f_saved;
+collectAll(Fs&&... fs) {
+ auto ctx = std::make_shared<detail::CollectAllVariadicContext<
+ typename std::decay<Fs>::type::value_type...>>();
+ detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
+ ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+ return ctx->p.getFuture();
}
-// when (iterator)
+// collectAll (iterator)
template <class InputIterator>
Future<
std::vector<
Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
-whenAll(InputIterator first, InputIterator last)
-{
+collectAll(InputIterator first, InputIterator last) {
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
- if (first >= last) {
- return makeFuture(std::vector<Try<T>>());
- }
- size_t n = std::distance(first, last);
+ struct CollectAllContext {
+ CollectAllContext(int n) : results(n) {}
+ ~CollectAllContext() {
+ p.setValue(std::move(results));
+ }
+ Promise<std::vector<Try<T>>> p;
+ std::vector<Try<T>> results;
+ };
- auto ctx = new detail::WhenAllContext<T>();
+ auto ctx = std::make_shared<CollectAllContext>(std::distance(first, last));
+ mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+ ctx->results[i] = std::move(t);
+ });
+ return ctx->p.getFuture();
+}
- ctx->results.resize(n);
+// collect (iterator)
- auto f_saved = ctx->p.getFuture();
+namespace detail {
- for (size_t i = 0; first != last; ++first, ++i) {
- assert(i < n);
- auto& f = *first;
- f.setCallback_([ctx, i, n](Try<T>&& t) {
- ctx->results[i] = std::move(t);
- if (++ctx->count == n) {
- ctx->p.setValue(std::move(ctx->results));
- delete ctx;
- }
- });
+template <typename T>
+struct CollectContext {
+ struct Nothing {
+ explicit Nothing(int /* n */) {}
+ };
+
+ using Result = typename std::conditional<
+ std::is_void<T>::value,
+ void,
+ std::vector<T>>::type;
+
+ using InternalResult = typename std::conditional<
+ std::is_void<T>::value,
+ Nothing,
+ std::vector<Optional<T>>>::type;
+
+ explicit CollectContext(int n) : result(n) {}
+ ~CollectContext() {
+ if (!threw.exchange(true)) {
+ // map Optional<T> -> T
+ std::vector<T> finalResult;
+ finalResult.reserve(result.size());
+ std::transform(result.begin(), result.end(),
+ std::back_inserter(finalResult),
+ [](Optional<T>& o) { return std::move(o.value()); });
+ p.setValue(std::move(finalResult));
+ }
+ }
+ inline void setPartialResult(size_t i, Try<T>& t) {
+ result[i] = std::move(t.value());
}
+ Promise<Result> p;
+ InternalResult result;
+ std::atomic<bool> threw {false};
+};
+
+}
+
+template <class InputIterator>
+Future<typename detail::CollectContext<
+ typename std::iterator_traits<InputIterator>::value_type::value_type>::Result>
+collect(InputIterator first, InputIterator last) {
+ typedef
+ typename std::iterator_traits<InputIterator>::value_type::value_type T;
- return f_saved;
+ auto ctx = std::make_shared<detail::CollectContext<T>>(
+ std::distance(first, last));
+ mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+ if (t.hasException()) {
+ if (!ctx->threw.exchange(true)) {
+ ctx->p.setException(std::move(t.exception()));
+ }
+ } else if (!ctx->threw) {
+ ctx->setPartialResult(i, t);
+ }
+ });
+ return ctx->p.getFuture();
+}
+
+// collect (variadic)
+
+template <typename... Fs>
+typename detail::CollectVariadicContext<
+ typename std::decay<Fs>::type::value_type...>::type
+collect(Fs&&... fs) {
+ auto ctx = std::make_shared<detail::CollectVariadicContext<
+ typename std::decay<Fs>::type::value_type...>>();
+ detail::collectVariadicHelper<detail::CollectVariadicContext>(
+ ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+ return ctx->p.getFuture();
}
+// collectAny (iterator)
+
template <class InputIterator>
Future<
std::pair<size_t,
Try<
typename
- std::iterator_traits<InputIterator>::value_type::value_type> > >
-whenAny(InputIterator first, InputIterator last) {
+ std::iterator_traits<InputIterator>::value_type::value_type>>>
+collectAny(InputIterator first, InputIterator last) {
typedef
typename std::iterator_traits<InputIterator>::value_type::value_type T;
- auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
- auto f_saved = ctx->p.getFuture();
-
- for (size_t i = 0; first != last; first++, i++) {
- auto& f = *first;
- f.setCallback_([i, ctx](Try<T>&& t) {
- if (!ctx->done.exchange(true)) {
- ctx->p.setValue(std::make_pair(i, std::move(t)));
- }
- ctx->decref();
- });
- }
+ struct CollectAnyContext {
+ CollectAnyContext() {};
+ Promise<std::pair<size_t, Try<T>>> p;
+ std::atomic<bool> done {false};
+ };
- return f_saved;
+ auto ctx = std::make_shared<CollectAnyContext>();
+ mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+ if (!ctx->done.exchange(true)) {
+ ctx->p.setValue(std::make_pair(i, std::move(t)));
+ }
+ });
+ return ctx->p.getFuture();
}
+// collectN (iterator)
+
template <class InputIterator>
Future<std::vector<std::pair<size_t, Try<typename
std::iterator_traits<InputIterator>::value_type::value_type>>>>
-whenN(InputIterator first, InputIterator last, size_t n) {
+collectN(InputIterator first, InputIterator last, size_t n) {
typedef typename
std::iterator_traits<InputIterator>::value_type::value_type T;
typedef std::vector<std::pair<size_t, Try<T>>> V;
- struct ctx_t {
+ struct CollectNContext {
V v;
- size_t completed;
+ std::atomic<size_t> completed = {0};
Promise<V> p;
};
- auto ctx = std::make_shared<ctx_t>();
- ctx->completed = 0;
-
- // for each completed Future, increase count and add to vector, until we
- // have n completed futures at which point we fulfil our Promise with the
- // vector
- auto it = first;
- size_t i = 0;
- while (it != last) {
- it->then([ctx, n, i](Try<T>&& t) {
- auto& v = ctx->v;
+ auto ctx = std::make_shared<CollectNContext>();
+
+ if (size_t(std::distance(first, last)) < n) {
+ ctx->p.setException(std::runtime_error("Not enough futures"));
+ } else {
+ // for each completed Future, increase count and add to vector, until we
+ // have n completed futures at which point we fulfil our Promise with the
+ // vector
+ mapSetCallback<T>(first, last, [ctx, n](size_t i, Try<T>&& t) {
auto c = ++ctx->completed;
if (c <= n) {
assert(ctx->v.size() < n);
- v.push_back(std::make_pair(i, std::move(t)));
+ ctx->v.emplace_back(i, std::move(t));
if (c == n) {
- ctx->p.fulfilTry(Try<V>(std::move(v)));
+ ctx->p.setTry(Try<V>(std::move(ctx->v)));
}
}
});
-
- it++;
- i++;
- }
-
- if (i < n) {
- ctx->p.setException(std::runtime_error("Not enough futures"));
}
return ctx->p.getFuture();
}
-namespace {
- template <class T>
- void getWaitHelper(Future<T>* f) {
- // If we already have a value do the cheap thing
- if (f->isReady()) {
- return;
- }
+// reduce (iterator)
+
+template <class It, class T, class F>
+Future<T> reduce(It first, It last, T&& initial, F&& func) {
+ if (first == last) {
+ return makeFuture(std::move(initial));
+ }
+
+ typedef typename std::iterator_traits<It>::value_type::value_type ItT;
+ typedef typename std::conditional<
+ detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type Arg;
+ typedef isTry<Arg> IsTry;
- folly::Baton<> baton;
- f->then([&](Try<T> const&) {
- baton.post();
+ folly::MoveWrapper<T> minitial(std::move(initial));
+ auto sfunc = std::make_shared<F>(std::move(func));
+
+ auto f = first->then([minitial, sfunc](Try<ItT>& head) mutable {
+ return (*sfunc)(std::move(*minitial),
+ head.template get<IsTry::value, Arg&&>());
+ });
+
+ for (++first; first != last; ++first) {
+ f = collectAll(f, *first).then([sfunc](std::tuple<Try<T>, Try<ItT>>& t) {
+ return (*sfunc)(std::move(std::get<0>(t).value()),
+ // Either return a ItT&& or a Try<ItT>&& depending
+ // on the type of the argument of func.
+ std::get<1>(t).template get<IsTry::value, Arg&&>());
});
- baton.wait();
}
- template <class T>
- Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
- // TODO make and use variadic whenAny #5877971
- Promise<T> p;
- auto token = std::make_shared<std::atomic<bool>>();
- folly::Baton<> baton;
-
- folly::detail::getTimekeeperSingleton()->after(dur)
- .then([&,token](Try<void> const& t) {
- if (token->exchange(true) == false) {
- if (t.hasException()) {
- p.setException(std::move(t.exception()));
- } else {
- p.setException(TimedOut());
- }
- baton.post();
- }
- });
+ return f;
+}
- f->then([&, token](Try<T>&& t) {
- if (token->exchange(true) == false) {
- p.fulfilTry(std::move(t));
- baton.post();
+// window (collection)
+
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Collection input, F func, size_t n) {
+ struct WindowContext {
+ WindowContext(Collection&& i, F&& fn)
+ : input_(std::move(i)), promises_(input_.size()),
+ func_(std::move(fn))
+ {}
+ std::atomic<size_t> i_ {0};
+ Collection input_;
+ std::vector<Promise<Result>> promises_;
+ F func_;
+
+ static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
+ size_t i = ctx->i_++;
+ if (i < ctx->input_.size()) {
+ // Using setCallback_ directly since we don't need the Future
+ ctx->func_(std::move(ctx->input_[i])).setCallback_(
+ // ctx is captured by value
+ [ctx, i](Try<Result>&& t) {
+ ctx->promises_[i].setTry(std::move(t));
+ // Chain another future onto this one
+ spawn(std::move(ctx));
+ });
}
- });
+ }
+ };
+
+ auto max = std::min(n, input.size());
- baton.wait();
- return p.getFuture();
+ auto ctx = std::make_shared<WindowContext>(
+ std::move(input), std::move(func));
+
+ for (size_t i = 0; i < max; ++i) {
+ // Start the first n Futures
+ WindowContext::spawn(ctx);
}
-}
-template <class T>
-T Future<T>::get() {
- getWaitHelper(this);
+ std::vector<Future<Result>> futures;
+ futures.reserve(ctx->promises_.size());
+ for (auto& promise : ctx->promises_) {
+ futures.emplace_back(promise.getFuture());
+ }
- // Big assumption here: the then() call above, since it doesn't move out
- // the value, leaves us with a value to return here. This would be a big
- // no-no in user code, but I'm invoking internal developer privilege. This
- // is slightly more efficient (save a move()) especially if there's an
- // exception (save a throw).
- return std::move(value());
+ return futures;
}
-template <>
-inline void Future<void>::get() {
- getWaitHelper(this);
- value();
-}
+// reduce
template <class T>
-T Future<T>::get(Duration dur) {
- return std::move(getWaitTimeoutHelper(this, dur).value());
+template <class I, class F>
+Future<I> Future<T>::reduce(I&& initial, F&& func) {
+ folly::MoveWrapper<I> minitial(std::move(initial));
+ folly::MoveWrapper<F> mfunc(std::move(func));
+ return then([minitial, mfunc](T& vals) mutable {
+ auto ret = std::move(*minitial);
+ for (auto& val : vals) {
+ ret = (*mfunc)(std::move(ret), std::move(val));
+ }
+ return ret;
+ });
}
-template <>
-inline void Future<void>::get(Duration dur) {
- getWaitTimeoutHelper(this, dur).value();
-}
+// unorderedReduce (iterator)
-template <class T>
-T Future<T>::getVia(DrivableExecutor* e) {
- while (!isReady()) {
- e->drive();
+template <class It, class T, class F, class ItT, class Arg>
+Future<T> unorderedReduce(It first, It last, T initial, F func) {
+ if (first == last) {
+ return makeFuture(std::move(initial));
}
- return std::move(value());
-}
-template <>
-inline void Future<void>::getVia(DrivableExecutor* e) {
- while (!isReady()) {
- e->drive();
- }
- value();
+ typedef isTry<Arg> IsTry;
+
+ struct UnorderedReduceContext {
+ UnorderedReduceContext(T&& memo, F&& fn, size_t n)
+ : lock_(), memo_(makeFuture<T>(std::move(memo))),
+ func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
+ {};
+ folly::MicroSpinLock lock_; // protects memo_ and numThens_
+ Future<T> memo_;
+ F func_;
+ size_t numThens_; // how many Futures completed and called .then()
+ size_t numFutures_; // how many Futures in total
+ Promise<T> promise_;
+ };
+
+ auto ctx = std::make_shared<UnorderedReduceContext>(
+ std::move(initial), std::move(func), std::distance(first, last));
+
+ mapSetCallback<ItT>(
+ first,
+ last,
+ [ctx](size_t /* i */, Try<ItT>&& t) {
+ folly::MoveWrapper<Try<ItT>> mt(std::move(t));
+ // Futures can be completed in any order, simultaneously.
+ // To make this non-blocking, we create a new Future chain in
+ // the order of completion to reduce the values.
+ // The spinlock just protects chaining a new Future, not actually
+ // executing the reduce, which should be really fast.
+ folly::MSLGuard lock(ctx->lock_);
+ ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
+ // Either return a ItT&& or a Try<ItT>&& depending
+ // on the type of the argument of func.
+ return ctx->func_(std::move(v),
+ mt->template get<IsTry::value, Arg&&>());
+ });
+ if (++ctx->numThens_ == ctx->numFutures_) {
+ // After reducing the value of the last Future, fulfill the Promise
+ ctx->memo_.setCallback_(
+ [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
+ }
+ });
+
+ return ctx->promise_.getFuture();
}
+// within
+
template <class T>
Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
return within(dur, TimedOut(), tk);
Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
struct Context {
- Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
+ Context(E ex) : exception(std::move(ex)), promise() {}
E exception;
+ Future<Unit> thisFuture;
Promise<T> promise;
- std::atomic<bool> token;
+ std::atomic<bool> token {false};
};
- auto ctx = std::make_shared<Context>(std::move(e));
+ std::shared_ptr<Timekeeper> tks;
if (!tk) {
- tk = folly::detail::getTimekeeperSingleton();
+ tks = folly::detail::getTimekeeperSingleton();
+ tk = DCHECK_NOTNULL(tks.get());
}
- tk->after(dur)
- .then([ctx](Try<void> const& t) {
- if (ctx->token.exchange(true) == false) {
- if (t.hasException()) {
- ctx->promise.setException(std::move(t.exception()));
- } else {
- ctx->promise.setException(std::move(ctx->exception));
- }
- }
- });
+ auto ctx = std::make_shared<Context>(std::move(e));
- this->then([ctx](Try<T>&& t) {
+ ctx->thisFuture = this->then([ctx](Try<T>&& t) mutable {
+ // TODO: "this" completed first, cancel "after"
if (ctx->token.exchange(true) == false) {
- ctx->promise.fulfilTry(std::move(t));
+ ctx->promise.setTry(std::move(t));
}
});
- return ctx->promise.getFuture();
+ tk->after(dur).then([ctx](Try<Unit> const& t) mutable {
+ // "after" completed first, cancel "this"
+ ctx->thisFuture.raise(TimedOut());
+ if (ctx->token.exchange(true) == false) {
+ if (t.hasException()) {
+ ctx->promise.setException(std::move(t.exception()));
+ } else {
+ ctx->promise.setException(std::move(ctx->exception));
+ }
+ }
+ });
+
+ return ctx->promise.getFuture().via(getExecutor());
}
+// delayed
+
template <class T>
Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
- return whenAll(*this, futures::sleep(dur, tk))
- .then([](std::tuple<Try<T>, Try<void>> tup) {
+ return collectAll(*this, futures::sleep(dur, tk))
+ .then([](std::tuple<Try<T>, Try<Unit>> tup) {
Try<T>& t = std::get<0>(tup);
return makeFuture<T>(std::move(t));
});
template <class T>
void waitImpl(Future<T>& f) {
- Baton<> baton;
- f = f.then([&](Try<T> t) {
- baton.post();
- return makeFuture(std::move(t));
- });
+ // short-circuit if there's nothing to do
+ if (f.isReady()) return;
+
+ FutureBatonType baton;
+ f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
baton.wait();
- // There's a race here between the return here and the actual finishing of
- // the future. f is completed, but the setup may not have finished on done
- // after the baton has posted.
- while (!f.isReady()) {
- std::this_thread::yield();
- }
+ assert(f.isReady());
}
template <class T>
void waitImpl(Future<T>& f, Duration dur) {
- auto baton = std::make_shared<Baton<>>();
- f = f.then([baton](Try<T> t) {
+ // short-circuit if there's nothing to do
+ if (f.isReady()) return;
+
+ folly::MoveWrapper<Promise<T>> promise;
+ auto ret = promise->getFuture();
+ auto baton = std::make_shared<FutureBatonType>();
+ f.setCallback_([baton, promise](Try<T>&& t) mutable {
+ promise->setTry(std::move(t));
baton->post();
- return makeFuture(std::move(t));
});
- // Let's preserve the invariant that if we did not timeout (timed_wait returns
- // true), then the returned Future is complete when it is returned to the
- // caller. We need to wait out the race for that Future to complete.
- if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
- while (!f.isReady()) {
- std::this_thread::yield();
- }
+ f = std::move(ret);
+ if (baton->timed_wait(dur)) {
+ assert(f.isReady());
}
}
return std::move(*this);
}
-namespace futures {
+template <class T>
+T Future<T>::get() {
+ return std::move(wait().value());
+}
- namespace {
- template <class Z, class F, class... Callbacks>
- Future<Z> chainHelper(F, Callbacks...);
+template <class T>
+T Future<T>::get(Duration dur) {
+ wait(dur);
+ if (isReady()) {
+ return std::move(value());
+ } else {
+ throw TimedOut();
+ }
+}
+
+template <class T>
+T Future<T>::getVia(DrivableExecutor* e) {
+ return std::move(waitVia(e).value());
+}
- template <class Z>
- Future<Z> chainHelper(Future<Z> f) {
- return f;
+namespace detail {
+ template <class T>
+ struct TryEquals {
+ static bool equals(const Try<T>& t1, const Try<T>& t2) {
+ return t1.value() == t2.value();
}
+ };
+}
+
+template <class T>
+Future<bool> Future<T>::willEqual(Future<T>& f) {
+ return collectAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
+ if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
+ return detail::TryEquals<T>::equals(std::get<0>(t), std::get<1>(t));
+ } else {
+ return false;
+ }
+ });
+}
- template <class Z, class F, class Fn, class... Callbacks>
- Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
- return chainHelper<Z>(f.then(fn), fns...);
+template <class T>
+template <class F>
+Future<T> Future<T>::filter(F predicate) {
+ auto p = folly::makeMoveWrapper(std::move(predicate));
+ return this->then([p](T val) {
+ T const& valConstRef = val;
+ if (!(*p)(valConstRef)) {
+ throw PredicateDoesNotObtain();
}
+ return val;
+ });
+}
+
+template <class T>
+template <class Callback>
+auto Future<T>::thenMulti(Callback&& fn)
+ -> decltype(this->then(std::forward<Callback>(fn))) {
+ // thenMulti with one callback is just a then
+ return then(std::forward<Callback>(fn));
+}
+
+template <class T>
+template <class Callback, class... Callbacks>
+auto Future<T>::thenMulti(Callback&& fn, Callbacks&&... fns)
+ -> decltype(this->then(std::forward<Callback>(fn)).
+ thenMulti(std::forward<Callbacks>(fns)...)) {
+ // thenMulti with two callbacks is just then(a).thenMulti(b, ...)
+ return then(std::forward<Callback>(fn)).
+ thenMulti(std::forward<Callbacks>(fns)...);
+}
+
+template <class T>
+template <class Callback, class... Callbacks>
+auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn,
+ Callbacks&&... fns)
+ -> decltype(this->then(std::forward<Callback>(fn)).
+ thenMulti(std::forward<Callbacks>(fns)...)) {
+ // thenMultiExecutor with two callbacks is
+ // via(x).then(a).thenMulti(b, ...).via(oldX)
+ auto oldX = getExecutor();
+ setExecutor(x);
+ return then(std::forward<Callback>(fn)).
+ thenMulti(std::forward<Callbacks>(fns)...).via(oldX);
+}
+
+template <class T>
+template <class Callback>
+auto Future<T>::thenMultiWithExecutor(Executor* x, Callback&& fn)
+ -> decltype(this->then(std::forward<Callback>(fn))) {
+ // thenMulti with one callback is just a then with an executor
+ return then(x, std::forward<Callback>(fn));
+}
+
+template <class F>
+inline Future<Unit> when(bool p, F thunk) {
+ return p ? thunk().unit() : makeFuture();
+}
+
+template <class P, class F>
+Future<Unit> whileDo(P predicate, F thunk) {
+ if (predicate()) {
+ return thunk().then([=] {
+ return whileDo(predicate, thunk);
+ });
}
+ return makeFuture();
+}
- template <class A, class Z, class... Callbacks>
- std::function<Future<Z>(Try<A>)>
- chain(Callbacks... fns) {
- MoveWrapper<Promise<A>> pw;
- MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
- return [=](Try<A> t) mutable {
- pw->fulfilTry(std::move(t));
- return std::move(*fw);
- };
+template <class F>
+Future<Unit> times(const int n, F thunk) {
+ auto count = folly::makeMoveWrapper(
+ std::unique_ptr<std::atomic<int>>(new std::atomic<int>(0))
+ );
+ return folly::whileDo([=]() mutable {
+ return (*count)->fetch_add(1) < n;
+ }, thunk);
+}
+
+namespace futures {
+ template <class It, class F, class ItT, class Result>
+ std::vector<Future<Result>> map(It first, It last, F func) {
+ std::vector<Future<Result>> results;
+ for (auto it = first; it != last; it++) {
+ results.push_back(it->then(func));
+ }
+ return results;
}
+}
+namespace futures {
+
+namespace detail {
+
+struct retrying_policy_raw_tag {};
+struct retrying_policy_fut_tag {};
+
+template <class Policy>
+struct retrying_policy_traits {
+ using ew = exception_wrapper;
+ FOLLY_CREATE_HAS_MEMBER_FN_TRAITS(has_op_call, operator());
+ template <class Ret>
+ using has_op = typename std::integral_constant<bool,
+ has_op_call<Policy, Ret(size_t, const ew&)>::value ||
+ has_op_call<Policy, Ret(size_t, const ew&) const>::value>;
+ using is_raw = has_op<bool>;
+ using is_fut = has_op<Future<bool>>;
+ using tag = typename std::conditional<
+ is_raw::value, retrying_policy_raw_tag, typename std::conditional<
+ is_fut::value, retrying_policy_fut_tag, void>::type>::type;
+};
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(size_t k, Policy&& p, FF&& ff) {
+ using F = typename std::result_of<FF(size_t)>::type;
+ using T = typename F::value_type;
+ auto f = ff(k++);
+ auto pm = makeMoveWrapper(p);
+ auto ffm = makeMoveWrapper(ff);
+ return f.onError([=](exception_wrapper x) mutable {
+ auto q = (*pm)(k, x);
+ auto xm = makeMoveWrapper(std::move(x));
+ return q.then([=](bool r) mutable {
+ return r
+ ? retrying(k, pm.move(), ffm.move())
+ : makeFuture<T>(xm.move());
+ });
+ });
}
-} // namespace folly
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_raw_tag) {
+ auto pm = makeMoveWrapper(std::move(p));
+ auto q = [=](size_t k, exception_wrapper x) {
+ return makeFuture<bool>((*pm)(k, x));
+ };
+ return retrying(0, std::move(q), std::forward<FF>(ff));
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff, retrying_policy_fut_tag) {
+ return retrying(0, std::forward<Policy>(p), std::forward<FF>(ff));
+}
+
+// jittered exponential backoff, clamped to [backoff_min, backoff_max]
+template <class URNG>
+Duration retryingJitteredExponentialBackoffDur(
+ size_t n,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG& rng) {
+ using d = Duration;
+ auto dist = std::normal_distribution<double>(0.0, jitter_param);
+ auto jitter = std::exp(dist(rng));
+ auto backoff = d(d::rep(jitter * backoff_min.count() * std::pow(2, n - 1)));
+ return std::max(backoff_min, std::min(backoff_max, backoff));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p) {
+ auto pm = makeMoveWrapper(std::move(p));
+ auto rngp = std::make_shared<URNG>(std::move(rng));
+ return [=](size_t n, const exception_wrapper& ex) mutable {
+ if (n == max_tries) { return makeFuture(false); }
+ return (*pm)(n, ex).then([=](bool v) {
+ if (!v) { return makeFuture(false); }
+ auto backoff = detail::retryingJitteredExponentialBackoffDur(
+ n, backoff_min, backoff_max, jitter_param, *rngp);
+ return futures::sleep(backoff).then([] { return true; });
+ });
+ };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p,
+ retrying_policy_raw_tag) {
+ auto pm = makeMoveWrapper(std::move(p));
+ auto q = [=](size_t n, const exception_wrapper& e) {
+ return makeFuture((*pm)(n, e));
+ };
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::move(rng),
+ std::move(q));
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p,
+ retrying_policy_fut_tag) {
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::move(rng),
+ std::move(p));
+}
+
+}
+
+template <class Policy, class FF>
+typename std::result_of<FF(size_t)>::type
+retrying(Policy&& p, FF&& ff) {
+ using tag = typename detail::retrying_policy_traits<Policy>::tag;
+ return detail::retrying(std::forward<Policy>(p), std::forward<FF>(ff), tag());
+}
+
+inline
+std::function<bool(size_t, const exception_wrapper&)>
+retryingPolicyBasic(
+ size_t max_tries) {
+ return [=](size_t n, const exception_wrapper&) { return n < max_tries; };
+}
+
+template <class Policy, class URNG>
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param,
+ URNG rng,
+ Policy&& p) {
+ using tag = typename detail::retrying_policy_traits<Policy>::tag;
+ return detail::retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ std::move(rng),
+ std::move(p),
+ tag());
+}
+
+inline
+std::function<Future<bool>(size_t, const exception_wrapper&)>
+retryingPolicyCappedJitteredExponentialBackoff(
+ size_t max_tries,
+ Duration backoff_min,
+ Duration backoff_max,
+ double jitter_param) {
+ auto p = [](size_t, const exception_wrapper&) { return true; };
+ return retryingPolicyCappedJitteredExponentialBackoff(
+ max_tries,
+ backoff_min,
+ backoff_max,
+ jitter_param,
+ ThreadLocalPRNG(),
+ std::move(p));
+}
-// I haven't included a Future<T&> specialization because I don't forsee us
-// using it, however it is not difficult to add when needed. Refer to
-// Future<void> for guidance. std::future and boost::future code would also be
-// instructive.
+}
+
+// Instantiate the most common Future types to save compile time
+extern template class Future<Unit>;
+extern template class Future<bool>;
+extern template class Future<int>;
+extern template class Future<int64_t>;
+extern template class Future<std::string>;
+extern template class Future<double>;
+
+} // namespace folly