Adding DeferredExecutor to support deferred execution of tasks on a future returned...
[folly.git] / folly / futures / Future-inl.h
index 17f472318db48006140283690b3999b94b00414f..1baee146817ac679e9c72c210c5ff312c8cd8df7 100644 (file)
@@ -200,17 +200,6 @@ T const&& FutureBase<T>::value() const&& {
   return std::move(core_->getTry().value());
 }
 
-template <class T>
-inline Future<T> FutureBase<T>::via(Executor* executor, int8_t priority) && {
-  throwIfInvalid();
-
-  setExecutor(executor, priority);
-
-  auto newFuture = Future<T>(core_);
-  core_ = nullptr;
-  return newFuture;
-}
-
 template <class T>
 bool FutureBase<T>::isReady() const {
   throwIfInvalid();
@@ -478,6 +467,75 @@ SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
   return *this;
 }
 
+template <class T>
+void SemiFuture<T>::boost_() {
+  // If a SemiFuture has an executor it should be deferred, so boost it
+  if (auto e = this->getExecutor()) {
+    // We know in a SemiFuture that if we have an executor it should be
+    // DeferredExecutor. Verify this in debug mode.
+    DCHECK(dynamic_cast<DeferredExecutor*>(e));
+
+    auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
+    static_cast<DeferredExecutor*>(e)->boost();
+  }
+}
+
+template <class T>
+inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
+  throwIfInvalid();
+
+  // If current executor is deferred, boost block to ensure that work
+  // progresses and is run on the new executor.
+  auto oldExecutor = this->getExecutor();
+  if (oldExecutor && executor && (executor != oldExecutor)) {
+    // We know in a SemiFuture that if we have an executor it should be
+    // DeferredExecutor. Verify this in debug mode.
+    DCHECK(dynamic_cast<DeferredExecutor*>(this->getExecutor()));
+    if (static_cast<DeferredExecutor*>(oldExecutor)) {
+      executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
+        static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
+      });
+    }
+  }
+
+  this->setExecutor(executor, priority);
+
+  auto newFuture = Future<T>(this->core_);
+  this->core_ = nullptr;
+  return newFuture;
+}
+
+template <class T>
+template <typename F>
+SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
+SemiFuture<T>::defer(F&& func) && {
+  // If we already have a deferred executor, use it, otherwise create one
+  auto defKeepAlive = this->getExecutor()
+      ? this->getExecutor()->getKeepAliveToken()
+      : DeferredExecutor::create();
+  auto e = defKeepAlive.get();
+  // We know in a SemiFuture that if we have an executor it should be
+  // DeferredExecutor (either it was that way before, or we just created it).
+  // Verify this in debug mode.
+  DCHECK(dynamic_cast<DeferredExecutor*>(e));
+  // Convert to a folly::future with a deferred executor
+  // Will be low-cost if this is not a new executor as via optimises for that
+  // case
+  auto sf =
+      std::move(*this)
+          .via(defKeepAlive.get())
+          // Then add the work, with a wrapper function that captures the
+          // keepAlive so the executor is destroyed at the right time.
+          .then(
+              DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
+          // Finally, convert back o a folly::SemiFuture to hide the executor
+          .semi();
+  // Carry deferred executor through chain as constructor from Future will
+  // nullify it
+  sf.setExecutor(e);
+  return sf;
+}
+
 template <class T>
 Future<T> Future<T>::makeEmpty() {
   return Future<T>(futures::detail::EmptyConstruct{});
@@ -539,6 +597,17 @@ typename std::
   });
 }
 
+template <class T>
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
+  this->throwIfInvalid();
+
+  this->setExecutor(executor, priority);
+
+  auto newFuture = Future<T>(this->core_);
+  this->core_ = nullptr;
+  return newFuture;
+}
+
 template <class T>
 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
   this->throwIfInvalid();
@@ -1269,6 +1338,14 @@ Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
 namespace futures {
 namespace detail {
 
+template <class T>
+void doBoost(folly::Future<T>& /* usused */) {}
+
+template <class T>
+void doBoost(folly::SemiFuture<T>& f) {
+  f.boost_();
+}
+
 template <class FutureType, typename T = typename FutureType::value_type>
 void waitImpl(FutureType& f) {
   // short-circuit if there's nothing to do
@@ -1278,6 +1355,7 @@ void waitImpl(FutureType& f) {
 
   FutureBatonType baton;
   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
+  doBoost(f);
   baton.wait();
   assert(f.isReady());
 }
@@ -1296,6 +1374,7 @@ void waitImpl(FutureType& f, Duration dur) {
     promise.setTry(std::move(t));
     baton->post();
   });
+  doBoost(f);
   f = std::move(ret);
   if (baton->timed_wait(dur)) {
     assert(f.isReady());