#include <folly/Baton.h>
#include <folly/Optional.h>
+#include <folly/futures/InlineExecutor.h>
#include <folly/futures/Timekeeper.h>
#include <folly/futures/detail/Core.h>
return makeSemiFuture(Try<typename std::decay<T>::type>(std::forward<T>(t)));
}
-inline SemiFuture<Unit> makeSemiFuture() {
- return makeSemiFuture(Unit{});
-}
-
// makeSemiFutureWith(SemiFuture<T>()) -> SemiFuture<T>
template <class F>
typename std::enable_if<
detach();
}
+// This must be defined after the constructors to avoid a bug in MSVC
+// https://connect.microsoft.com/VisualStudio/feedback/details/3142777/out-of-line-constructor-definition-after-implicit-reference-causes-incorrect-c2244
+inline SemiFuture<Unit> makeSemiFuture() {
+ return makeSemiFuture(Unit{});
+}
+
template <class T>
T& SemiFuture<T>::value() & {
throwIfInvalid();
template <class T>
void SemiFuture<T>::throwIfInvalid() const {
- if (!core_)
+ if (!core_) {
throwNoState();
}
+}
template <class T>
Optional<Try<T>> SemiFuture<T>::poll() {
template <class Collection, class F, class ItT, class Result>
std::vector<Future<Result>>
window(Collection input, F func, size_t n) {
- struct WindowContext {
- WindowContext(Collection&& i, F&& fn)
- : input_(std::move(i)), promises_(input_.size()),
- func_(std::move(fn))
- {}
- std::atomic<size_t> i_ {0};
- Collection input_;
- std::vector<Promise<Result>> promises_;
- F func_;
+ // Use global inline executor singleton
+ auto executor = &InlineExecutor::instance();
+ return window(executor, std::move(input), std::move(func), n);
+}
- static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
- size_t i = ctx->i_++;
- if (i < ctx->input_.size()) {
- // Using setCallback_ directly since we don't need the Future
- ctx->func_(std::move(ctx->input_[i])).setCallback_(
- // ctx is captured by value
- [ctx, i](Try<Result>&& t) {
- ctx->promises_[i].setTry(std::move(t));
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Executor* executor, Collection input, F func, size_t n) {
+ struct WindowContext {
+ WindowContext(Executor* executor_, Collection&& input_, F&& func_)
+ : executor(executor_),
+ input(std::move(input_)),
+ promises(input.size()),
+ func(std::move(func_)) {}
+ std::atomic<size_t> i{0};
+ Executor* executor;
+ Collection input;
+ std::vector<Promise<Result>> promises;
+ F func;
+
+ static inline void spawn(std::shared_ptr<WindowContext> ctx) {
+ size_t i = ctx->i++;
+ if (i < ctx->input.size()) {
+ auto fut = ctx->func(std::move(ctx->input[i]));
+ fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
+ const auto executor_ = ctx->executor;
+ executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
+ ctx->promises[i].setTry(std::move(t));
// Chain another future onto this one
spawn(std::move(ctx));
});
+ });
}
}
};
auto max = std::min(n, input.size());
auto ctx = std::make_shared<WindowContext>(
- std::move(input), std::move(func));
+ executor, std::move(input), std::move(func));
+ // Start the first n Futures
for (size_t i = 0; i < max; ++i) {
- // Start the first n Futures
- WindowContext::spawn(ctx);
+ executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
}
std::vector<Future<Result>> futures;
- futures.reserve(ctx->promises_.size());
- for (auto& promise : ctx->promises_) {
+ futures.reserve(ctx->promises.size());
+ for (auto& promise : ctx->promises) {
futures.emplace_back(promise.getFuture());
}
template <class FutureType, typename T = typename FutureType::value_type>
void waitImpl(FutureType& f) {
// short-circuit if there's nothing to do
- if (f.isReady()) return;
+ if (f.isReady()) {
+ return;
+ }
FutureBatonType baton;
f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
// Set callback so to ensure that the via executor has something on it
// so that once the preceding future triggers this callback, drive will
// always have a callback to satisfy it
- if (f.isReady())
+ if (f.isReady()) {
return;
+ }
f = f.via(e).then([](T&& t) { return std::move(t); });
while (!f.isReady()) {
e->drive();