(Wangle) Make via behave more like gate
[folly.git] / folly / wangle / Future-inl.h
index 6b5907dc7693a86cb2fcad34b1f7f257a2fb6521..f404067b8afab3a39176f4addc2885933c303a63 100644 (file)
 
 #pragma once
 
-#include "detail.h"
+#include <chrono>
+#include <thread>
+
+#include <folly/wangle/detail/State.h>
+#include <folly/Baton.h>
 
 namespace folly { namespace wangle {
 
@@ -31,39 +35,40 @@ struct isFuture<Future<T> > {
 };
 
 template <class T>
-Future<T>::Future(Future<T>&& other) : obj_(other.obj_) {
-  other.obj_ = nullptr;
+Future<T>::Future(Future<T>&& other) noexcept : state_(nullptr) {
+  *this = std::move(other);
 }
 
 template <class T>
 Future<T>& Future<T>::operator=(Future<T>&& other) {
-  std::swap(obj_, other.obj_);
+  std::swap(state_, other.state_);
   return *this;
 }
 
 template <class T>
 Future<T>::~Future() {
-  if (obj_) {
-    if (obj_->ready()) {
-      delete obj_;
-    } else {
-      setContinuation([](Try<T>&&) {}); // detach
-    }
+  detach();
+}
+
+template <class T>
+void Future<T>::detach() {
+  if (state_) {
+    state_->detachFuture();
+    state_ = nullptr;
   }
 }
 
 template <class T>
 void Future<T>::throwIfInvalid() const {
-  if (!obj_)
+  if (!state_)
     throw NoState();
 }
 
 template <class T>
 template <class F>
-void Future<T>::setContinuation(F&& func) {
+void Future<T>::setCallback_(F&& func) {
   throwIfInvalid();
-  obj_->setContinuation(std::move(func));
-  obj_ = nullptr;
+  state_->setCallback(std::move(func));
 }
 
 template <class T>
@@ -90,10 +95,10 @@ Future<T>::then(F&& func) {
      sophisticated that avoids making a new Future object when it can, as an
      optimization. But this is correct.
 
-     obj_ can't be moved, it is explicitly disallowed (as is copying). But
+     state_ can't be moved, it is explicitly disallowed (as is copying). But
      if there's ever a reason to allow it, this is one place that makes that
      assumption and would need to be fixed. We use a standard shared pointer
-     for obj_ (by copying it in), which means in essence obj holds a shared
+     for state_ (by copying it in), which means in essence obj holds a shared
      pointer to itself.  But this shouldn't leak because Promise will not
      outlive the continuation, because Promise will setException() with a
      broken Promise if it is destructed before completed. We could use a
@@ -105,17 +110,17 @@ Future<T>::then(F&& func) {
      We have to move in the Promise and func using the MoveWrapper
      hack. (func could be copied but it's a big drag on perf).
 
-     Two subtle but important points about this design. FutureObject has no
+     Two subtle but important points about this design. detail::State has no
      back pointers to Future or Promise, so if Future or Promise get moved
      (and they will be moved in performant code) we don't have to do
      anything fancy. And because we store the continuation in the
-     FutureObject, not in the Future, we can execute the continuation even
+     detail::State, not in the Future, we can execute the continuation even
      after the Future has gone out of scope. This is an intentional design
      decision. It is likely we will want to be able to cancel a continuation
      in some circumstances, but I think it should be explicit not implicit
      in the destruction of the Future used to create it.
      */
-  setContinuation(
+  setCallback_(
     [p, funcm](Try<T>&& t) mutable {
       p->fulfil([&]() {
           return (*funcm)(std::move(t));
@@ -142,12 +147,12 @@ Future<T>::then(F&& func) {
   // grab the Future now before we lose our handle on the Promise
   auto f = p->getFuture();
 
-  setContinuation(
+  setCallback_(
     [p, funcm](Try<T>&& t) mutable {
       try {
         auto f2 = (*funcm)(std::move(t));
         // that didn't throw, now we can steal p
-        f2.setContinuation([p](Try<B>&& b) mutable {
+        f2.setCallback_([p](Try<B>&& b) mutable {
             p->fulfilTry(std::move(b));
           });
       } catch (...) {
@@ -167,61 +172,69 @@ template <class T>
 typename std::add_lvalue_reference<T>::type Future<T>::value() {
   throwIfInvalid();
 
-  return obj_->value();
+  return state_->value();
 }
 
 template <class T>
 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
   throwIfInvalid();
 
-  return obj_->value();
+  return state_->value();
 }
 
 template <class T>
 Try<T>& Future<T>::getTry() {
   throwIfInvalid();
 
-  return obj_->getTry();
+  return state_->getTry();
 }
 
 template <class T>
 template <typename Executor>
-inline Future<T> Future<T>::executeWithSameThread(Executor* executor) {
+inline Future<T> Future<T>::via(Executor* executor) {
   throwIfInvalid();
-
-  folly::MoveWrapper<Promise<T>> p;
-  auto f = p->getFuture();
-
-  setContinuation([executor, p](Try<T>&& t) mutable {
-      folly::MoveWrapper<Try<T>> tt(std::move(t));
-      executor->add([p, tt]() mutable {
-          p->fulfilTry(std::move(*tt));
-        });
-    });
+  MoveWrapper<Promise<T>> promise;
+
+  auto f = promise->getFuture();
+  // We are obligated to return a cold future.
+  f.deactivate();
+  // But we also need to make this one cold for via to at least work some of
+  // the time. (see below)
+  deactivate();
+
+  then([=](Try<T>&& t) mutable {
+    MoveWrapper<Try<T>> tw(std::move(t));
+    // There is a race here.
+    // When the promise is fulfilled, and the future is still inactive, when
+    // the future is activated (e.g. by destruction) the callback will happen
+    // in that context, not in the intended context (which has already left
+    // the building).
+    //
+    // Currently, this will work fine because all the temporaries are
+    // destructed in an order that is compatible with this implementation:
+    //
+    //   makeFuture().via(x).then(a).then(b);
+    //
+    // However, this will not work reliably:
+    //
+    //   auto f2 = makeFuture().via(x);
+    //   f2.then(a).then(b);
+    //
+    // Because the first temporary is destructed on the first line, and the
+    // executor is fed. But by the time f2 is destructed, the executor
+    // may have already fulfilled the promise on the other thread.
+    //
+    // TODO(#4920689) fix it
+    executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); });
+  });
 
   return f;
 }
 
-template <class T>
-template <typename Executor>
-inline void Future<T>::executeWith(
-    Executor* executor, Promise<T>&& cont_promise) {
-  throwIfInvalid();
-
-  folly::MoveWrapper<Promise<T>> p(std::move(cont_promise));
-
-  setContinuation([executor, p](Try<T>&& t) mutable {
-      folly::MoveWrapper<Try<T>> tt(std::move(t));
-      executor->add([p, tt]() mutable {
-          p->fulfilTry(std::move(*tt));
-        });
-    });
-}
-
 template <class T>
 bool Future<T>::isReady() const {
   throwIfInvalid();
-  return obj_->ready();
+  return state_->ready();
 }
 
 // makeFuture
@@ -271,7 +284,8 @@ Future<T> makeFuture(std::exception_ptr const& e) {
 }
 
 template <class T, class E>
-typename std::enable_if<std::is_base_of<std::exception, E>::value, Future<T>>::type
+typename std::enable_if<std::is_base_of<std::exception, E>::value,
+                        Future<T>>::type
 makeFuture(E const& e) {
   Promise<T> p;
   auto f = p.getFuture();
@@ -279,6 +293,25 @@ makeFuture(E const& e) {
   return std::move(f);
 }
 
+template <class T>
+Future<T> makeFuture(Try<T>&& t) {
+  try {
+    return makeFuture<T>(std::move(t.value()));
+  } catch (...) {
+    return makeFuture<T>(std::current_exception());
+  }
+}
+
+template <>
+inline Future<void> makeFuture(Try<void>&& t) {
+  try {
+    t.throwIfFailed();
+    return makeFuture();
+  } catch (...) {
+    return makeFuture<void>(std::current_exception());
+  }
+}
+
 // when (variadic)
 
 template <typename... Fs>
@@ -307,8 +340,9 @@ whenAll(InputIterator first, InputIterator last)
     typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
   auto n = std::distance(first, last);
-  if (n == 0)
-    return makeFuture<std::vector<Try<T>>>({});
+  if (n == 0) {
+    return makeFuture(std::vector<Try<T>>());
+  }
 
   auto ctx = new detail::WhenAllContext<T>();
 
@@ -319,7 +353,7 @@ whenAll(InputIterator first, InputIterator last)
 
   for (size_t i = 0; first != last; ++first, ++i) {
      auto& f = *first;
-     f.setContinuation([ctx, i](Try<T>&& t) {
+     f.setCallback_([ctx, i](Try<T>&& t) {
          ctx->results[i] = std::move(t);
          if (++ctx->count == ctx->total) {
            ctx->p.setValue(std::move(ctx->results));
@@ -346,7 +380,7 @@ whenAny(InputIterator first, InputIterator last) {
 
   for (size_t i = 0; first != last; first++, i++) {
     auto& f = *first;
-    f.setContinuation([i, ctx](Try<T>&& t) {
+    f.setCallback_([i, ctx](Try<T>&& t) {
       if (!ctx->done.exchange(true)) {
         ctx->p.setValue(std::make_pair(i, std::move(t)));
       }
@@ -402,6 +436,65 @@ whenN(InputIterator first, InputIterator last, size_t n) {
   return ctx->p.getFuture();
 }
 
+template <typename T>
+Future<T>
+waitWithSemaphore(Future<T>&& f) {
+  Baton<> baton;
+  auto done = f.then([&](Try<T> &&t) {
+    baton.post();
+    return std::move(t.value());
+  });
+  baton.wait();
+  while (!done.isReady()) {
+    // There's a race here between the return here and the actual finishing of
+    // the future. f is completed, but the setup may not have finished on done
+    // after the baton has posted.
+    std::this_thread::yield();
+  }
+  return done;
+}
+
+template<>
+inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
+  Baton<> baton;
+  auto done = f.then([&](Try<void> &&t) {
+    baton.post();
+    t.value();
+  });
+  baton.wait();
+  while (!done.isReady()) {
+    // There's a race here between the return here and the actual finishing of
+    // the future. f is completed, but the setup may not have finished on done
+    // after the baton has posted.
+    std::this_thread::yield();
+  }
+  return done;
+}
+
+template <typename T, class Duration>
+Future<T>
+waitWithSemaphore(Future<T>&& f, Duration timeout) {
+  auto baton = std::make_shared<Baton<>>();
+  auto done = f.then([baton](Try<T> &&t) {
+    baton->post();
+    return std::move(t.value());
+  });
+  baton->timed_wait(std::chrono::system_clock::now() + timeout);
+  return done;
+}
+
+template <class Duration>
+Future<void>
+waitWithSemaphore(Future<void>&& f, Duration timeout) {
+  auto baton = std::make_shared<Baton<>>();
+  auto done = f.then([baton](Try<void> &&t) {
+    baton->post();
+    t.value();
+  });
+  baton->timed_wait(std::chrono::system_clock::now() + timeout);
+  return done;
+}
+
 }}
 
 // I haven't included a Future<T&> specialization because I don't forsee us