#pragma once
-#include "detail.h"
+#include <folly/wangle/detail/State.h>
+#include <folly/LifoSem.h>
namespace folly { namespace wangle {
};
template <class T>
-Future<T>::Future(Future<T>&& other) : obj_(other.obj_) {
- other.obj_ = nullptr;
+Future<T>::Future(Future<T>&& other) noexcept : state_(nullptr) {
+ *this = std::move(other);
}
template <class T>
Future<T>& Future<T>::operator=(Future<T>&& other) {
- std::swap(obj_, other.obj_);
+ std::swap(state_, other.state_);
return *this;
}
template <class T>
Future<T>::~Future() {
- if (obj_) {
- if (obj_->ready()) {
- delete obj_;
- } else {
- setContinuation([](Try<T>&&) {}); // detach
- }
+ detach();
+}
+
+template <class T>
+void Future<T>::detach() {
+ if (state_) {
+ state_->detachFuture();
+ state_ = nullptr;
}
}
template <class T>
void Future<T>::throwIfInvalid() const {
- if (!obj_)
+ if (!state_)
throw NoState();
}
template <class T>
template <class F>
-void Future<T>::setContinuation(F&& func) {
+void Future<T>::setCallback_(F&& func) {
throwIfInvalid();
- obj_->setContinuation(std::move(func));
- obj_ = nullptr;
+ state_->setCallback(std::move(func));
}
template <class T>
sophisticated that avoids making a new Future object when it can, as an
optimization. But this is correct.
- obj_ can't be moved, it is explicitly disallowed (as is copying). But
+ state_ can't be moved, it is explicitly disallowed (as is copying). But
if there's ever a reason to allow it, this is one place that makes that
assumption and would need to be fixed. We use a standard shared pointer
- for obj_ (by copying it in), which means in essence obj holds a shared
+ for state_ (by copying it in), which means in essence obj holds a shared
pointer to itself. But this shouldn't leak because Promise will not
outlive the continuation, because Promise will setException() with a
broken Promise if it is destructed before completed. We could use a
We have to move in the Promise and func using the MoveWrapper
hack. (func could be copied but it's a big drag on perf).
- Two subtle but important points about this design. FutureObject has no
+ Two subtle but important points about this design. detail::State has no
back pointers to Future or Promise, so if Future or Promise get moved
(and they will be moved in performant code) we don't have to do
anything fancy. And because we store the continuation in the
- FutureObject, not in the Future, we can execute the continuation even
+ detail::State, not in the Future, we can execute the continuation even
after the Future has gone out of scope. This is an intentional design
decision. It is likely we will want to be able to cancel a continuation
in some circumstances, but I think it should be explicit not implicit
in the destruction of the Future used to create it.
*/
- setContinuation(
+ setCallback_(
[p, funcm](Try<T>&& t) mutable {
p->fulfil([&]() {
return (*funcm)(std::move(t));
// grab the Future now before we lose our handle on the Promise
auto f = p->getFuture();
- setContinuation(
+ setCallback_(
[p, funcm](Try<T>&& t) mutable {
try {
auto f2 = (*funcm)(std::move(t));
// that didn't throw, now we can steal p
- f2.setContinuation([p](Try<B>&& b) mutable {
+ f2.setCallback_([p](Try<B>&& b) mutable {
p->fulfilTry(std::move(b));
});
} catch (...) {
typename std::add_lvalue_reference<T>::type Future<T>::value() {
throwIfInvalid();
- return obj_->value();
+ return state_->value();
}
template <class T>
typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
throwIfInvalid();
- return obj_->value();
+ return state_->value();
}
template <class T>
Try<T>& Future<T>::getTry() {
throwIfInvalid();
- return obj_->getTry();
+ return state_->getTry();
}
template <class T>
template <typename Executor>
inline Future<T> Future<T>::via(Executor* executor) {
throwIfInvalid();
-
- folly::MoveWrapper<Promise<T>> p;
- auto f = p->getFuture();
-
- setContinuation([executor, p](Try<T>&& t) mutable {
- folly::MoveWrapper<Try<T>> tt(std::move(t));
- executor->add([p, tt]() mutable {
- p->fulfilTry(std::move(*tt));
- });
- });
-
+ auto f = then([=](Try<T>&& t) {
+ MoveWrapper<Promise<T>> promise;
+ MoveWrapper<Try<T>> tw(std::move(t));
+ auto f2 = promise->getFuture();
+ executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); });
+ return f2;
+ });
+ f.deactivate();
return f;
}
-template <class T>
-template <typename Executor>
-inline void Future<T>::executeWith(
- Executor* executor, Promise<T>&& cont_promise) {
- throwIfInvalid();
-
- folly::MoveWrapper<Promise<T>> p(std::move(cont_promise));
-
- setContinuation([executor, p](Try<T>&& t) mutable {
- folly::MoveWrapper<Try<T>> tt(std::move(t));
- executor->add([p, tt]() mutable {
- p->fulfilTry(std::move(*tt));
- });
- });
-}
-
template <class T>
bool Future<T>::isReady() const {
throwIfInvalid();
- return obj_->ready();
+ return state_->ready();
}
// makeFuture
return std::move(f);
}
+template <class T>
+Future<T> makeFuture(Try<T>&& t) {
+ try {
+ return makeFuture<T>(std::move(t.value()));
+ } catch (...) {
+ return makeFuture<T>(std::current_exception());
+ }
+}
+
+template <>
+inline Future<void> makeFuture(Try<void>&& t) {
+ try {
+ t.throwIfFailed();
+ return makeFuture();
+ } catch (...) {
+ return makeFuture<void>(std::current_exception());
+ }
+}
+
// when (variadic)
template <typename... Fs>
typename std::iterator_traits<InputIterator>::value_type::value_type T;
auto n = std::distance(first, last);
- if (n == 0)
- return makeFuture<std::vector<Try<T>>>({});
+ if (n == 0) {
+ return makeFuture(std::vector<Try<T>>());
+ }
auto ctx = new detail::WhenAllContext<T>();
for (size_t i = 0; first != last; ++first, ++i) {
auto& f = *first;
- f.setContinuation([ctx, i](Try<T>&& t) {
+ f.setCallback_([ctx, i](Try<T>&& t) {
ctx->results[i] = std::move(t);
if (++ctx->count == ctx->total) {
ctx->p.setValue(std::move(ctx->results));
for (size_t i = 0; first != last; first++, i++) {
auto& f = *first;
- f.setContinuation([i, ctx](Try<T>&& t) {
+ f.setCallback_([i, ctx](Try<T>&& t) {
if (!ctx->done.exchange(true)) {
ctx->p.setValue(std::make_pair(i, std::move(t)));
}
return ctx->p.getFuture();
}
+template <typename T>
+Future<T>
+waitWithSemaphore(Future<T>&& f) {
+ LifoSem sem;
+ auto done = f.then([&](Try<T> &&t) {
+ sem.post();
+ return std::move(t.value());
+ });
+ sem.wait();
+ return done;
+}
+
+template<>
+inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
+ LifoSem sem;
+ auto done = f.then([&](Try<void> &&t) {
+ sem.post();
+ t.value();
+ });
+ sem.wait();
+ return done;
+}
+
+template <typename T, class Duration>
+Future<T>
+waitWithSemaphore(Future<T>&& f, Duration timeout) {
+ auto sem = std::make_shared<LifoSem>();
+ auto done = f.then([sem](Try<T> &&t) {
+ sem->post();
+ return std::move(t.value());
+ });
+ std::thread t([sem, timeout](){
+ std::this_thread::sleep_for(timeout);
+ sem->shutdown();
+ });
+ t.detach();
+ try {
+ sem->wait();
+ } catch (ShutdownSemError & ign) { }
+ return done;
+}
+
+template <class Duration>
+Future<void>
+waitWithSemaphore(Future<void>&& f, Duration timeout) {
+ auto sem = std::make_shared<LifoSem>();
+ auto done = f.then([sem](Try<void> &&t) {
+ sem->post();
+ t.value();
+ });
+ std::thread t([sem, timeout](){
+ std::this_thread::sleep_for(timeout);
+ sem->shutdown();
+ });
+ t.detach();
+ try {
+ sem->wait();
+ } catch (ShutdownSemError & ign) { }
+ return done;
+}
+
}}
// I haven't included a Future<T&> specialization because I don't forsee us