Adding DeferredExecutor to support deferred execution of tasks on a future returned...
authorLee Howes <lwh@fb.com>
Tue, 31 Oct 2017 21:20:08 +0000 (14:20 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Tue, 31 Oct 2017 21:23:59 +0000 (14:23 -0700)
Summary: This adds a DeferredExecutor type that is boostable, which means that it follows the expectation we expect for C++20 that .then and get will trigger boost-blocking behaviour and ensure work makes progress. Unlike discussions for C++ this adds boost blocking to folly only in the specific case of deferring work to run on the caller's executor, to avoid the necessity to pass an executor into a library purely to ensure that finalisation work and future completion occor on a well-defined exewcutor.

Reviewed By: yfeldblum

Differential Revision: D5828743

fbshipit-source-id: 9a4b69d7deaa33c3cecd6546651b99cc99f0c286

folly/Executor.h
folly/futures/Future-inl.h
folly/futures/Future-pre.h
folly/futures/Future.cpp
folly/futures/Future.h
folly/futures/test/SemiFutureTest.cpp

index 6886621..a2ac0d6 100644 (file)
@@ -59,6 +59,10 @@ class Executor {
       return executor_ != nullptr;
     }
 
+    Executor* get() const {
+      return executor_.get();
+    }
+
    private:
     friend class Executor;
     explicit KeepAlive(folly::Executor* executor) : executor_(executor) {}
index 17f4723..1baee14 100644 (file)
@@ -200,17 +200,6 @@ T const&& FutureBase<T>::value() const&& {
   return std::move(core_->getTry().value());
 }
 
-template <class T>
-inline Future<T> FutureBase<T>::via(Executor* executor, int8_t priority) && {
-  throwIfInvalid();
-
-  setExecutor(executor, priority);
-
-  auto newFuture = Future<T>(core_);
-  core_ = nullptr;
-  return newFuture;
-}
-
 template <class T>
 bool FutureBase<T>::isReady() const {
   throwIfInvalid();
@@ -478,6 +467,75 @@ SemiFuture<T>& SemiFuture<T>::operator=(Future<T>&& other) noexcept {
   return *this;
 }
 
+template <class T>
+void SemiFuture<T>::boost_() {
+  // If a SemiFuture has an executor it should be deferred, so boost it
+  if (auto e = this->getExecutor()) {
+    // We know in a SemiFuture that if we have an executor it should be
+    // DeferredExecutor. Verify this in debug mode.
+    DCHECK(dynamic_cast<DeferredExecutor*>(e));
+
+    auto ka = static_cast<DeferredExecutor*>(e)->getKeepAliveToken();
+    static_cast<DeferredExecutor*>(e)->boost();
+  }
+}
+
+template <class T>
+inline Future<T> SemiFuture<T>::via(Executor* executor, int8_t priority) && {
+  throwIfInvalid();
+
+  // If current executor is deferred, boost block to ensure that work
+  // progresses and is run on the new executor.
+  auto oldExecutor = this->getExecutor();
+  if (oldExecutor && executor && (executor != oldExecutor)) {
+    // We know in a SemiFuture that if we have an executor it should be
+    // DeferredExecutor. Verify this in debug mode.
+    DCHECK(dynamic_cast<DeferredExecutor*>(this->getExecutor()));
+    if (static_cast<DeferredExecutor*>(oldExecutor)) {
+      executor->add([oldExecutorKA = oldExecutor->getKeepAliveToken()]() {
+        static_cast<DeferredExecutor*>(oldExecutorKA.get())->boost();
+      });
+    }
+  }
+
+  this->setExecutor(executor, priority);
+
+  auto newFuture = Future<T>(this->core_);
+  this->core_ = nullptr;
+  return newFuture;
+}
+
+template <class T>
+template <typename F>
+SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
+SemiFuture<T>::defer(F&& func) && {
+  // If we already have a deferred executor, use it, otherwise create one
+  auto defKeepAlive = this->getExecutor()
+      ? this->getExecutor()->getKeepAliveToken()
+      : DeferredExecutor::create();
+  auto e = defKeepAlive.get();
+  // We know in a SemiFuture that if we have an executor it should be
+  // DeferredExecutor (either it was that way before, or we just created it).
+  // Verify this in debug mode.
+  DCHECK(dynamic_cast<DeferredExecutor*>(e));
+  // Convert to a folly::future with a deferred executor
+  // Will be low-cost if this is not a new executor as via optimises for that
+  // case
+  auto sf =
+      std::move(*this)
+          .via(defKeepAlive.get())
+          // Then add the work, with a wrapper function that captures the
+          // keepAlive so the executor is destroyed at the right time.
+          .then(
+              DeferredExecutor::wrap(std::move(defKeepAlive), std::move(func)))
+          // Finally, convert back o a folly::SemiFuture to hide the executor
+          .semi();
+  // Carry deferred executor through chain as constructor from Future will
+  // nullify it
+  sf.setExecutor(e);
+  return sf;
+}
+
 template <class T>
 Future<T> Future<T>::makeEmpty() {
   return Future<T>(futures::detail::EmptyConstruct{});
@@ -539,6 +597,17 @@ typename std::
   });
 }
 
+template <class T>
+inline Future<T> Future<T>::via(Executor* executor, int8_t priority) && {
+  this->throwIfInvalid();
+
+  this->setExecutor(executor, priority);
+
+  auto newFuture = Future<T>(this->core_);
+  this->core_ = nullptr;
+  return newFuture;
+}
+
 template <class T>
 inline Future<T> Future<T>::via(Executor* executor, int8_t priority) & {
   this->throwIfInvalid();
@@ -1269,6 +1338,14 @@ Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
 namespace futures {
 namespace detail {
 
+template <class T>
+void doBoost(folly::Future<T>& /* usused */) {}
+
+template <class T>
+void doBoost(folly::SemiFuture<T>& f) {
+  f.boost_();
+}
+
 template <class FutureType, typename T = typename FutureType::value_type>
 void waitImpl(FutureType& f) {
   // short-circuit if there's nothing to do
@@ -1278,6 +1355,7 @@ void waitImpl(FutureType& f) {
 
   FutureBatonType baton;
   f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
+  doBoost(f);
   baton.wait();
   assert(f.isReady());
 }
@@ -1296,6 +1374,7 @@ void waitImpl(FutureType& f, Duration dur) {
     promise.setTry(std::move(t));
     baton->post();
   });
+  doBoost(f);
   f = std::move(ret);
   if (baton->timed_wait(dur)) {
     assert(f.isReady());
index 86514b0..1fd0ebe 100644 (file)
@@ -153,6 +153,106 @@ struct Extract<R (&)(Args...)> {
   typedef typename ArgType<Args...>::FirstArg FirstArg;
 };
 
+/**
+ * Defer work until executor is actively boosted.
+ */
+class DeferredExecutor final : public Executor {
+ public:
+  template <typename Class, typename F>
+  struct DeferredWorkWrapper;
+
+  /**
+   * Work wrapper class to capture the keepalive and forward the argument
+   * list to the captured function.
+   */
+  template <typename F, typename R, typename... Args>
+  struct DeferredWorkWrapper<F, R (F::*)(Args...) const> {
+    R operator()(Args... args) {
+      return func(std::forward<Args>(args)...);
+    }
+
+    Executor::KeepAlive a;
+    F func;
+  };
+
+  /**
+   * Construction is private to ensure that creation and deletion are
+   * symmetric
+   */
+  static KeepAlive create() {
+    std::unique_ptr<futures::detail::DeferredExecutor> devb{
+        new futures::detail::DeferredExecutor{}};
+    auto keepAlive = devb->getKeepAliveToken();
+    devb.release();
+    return keepAlive;
+  }
+
+  /// Enqueue a function to executed by this executor. This is not thread-safe.
+  void add(Func func) override {
+    // If we already have a function, wrap and chain. Otherwise assign.
+    if (func_) {
+      func_ = [oldFunc = std::move(func_), func = std::move(func)]() mutable {
+        oldFunc();
+        func();
+      };
+    } else {
+      func_ = std::move(func);
+    }
+  }
+
+  // Boost is like drive for certain types of deferred work
+  // Unlike drive it is safe to run on another executor because it
+  // will only be implemented on deferred-safe executors
+  void boost() {
+    // Ensure that the DeferredExecutor outlives its run operation
+    ++keepAliveCount_;
+    SCOPE_EXIT {
+      releaseAndTryFree();
+    };
+
+    // Drain the executor
+    while (auto func = std::move(func_)) {
+      func();
+    }
+  }
+
+  KeepAlive getKeepAliveToken() override {
+    keepAliveAcquire();
+    return makeKeepAlive();
+  }
+
+  ~DeferredExecutor() = default;
+
+  template <class F>
+  static auto wrap(Executor::KeepAlive keepAlive, F&& func)
+      -> DeferredWorkWrapper<F, decltype(&F::operator())> {
+    return DeferredExecutor::DeferredWorkWrapper<F, decltype(&F::operator())>{
+        std::move(keepAlive), std::forward<F>(func)};
+  }
+
+ protected:
+  void keepAliveAcquire() override {
+    ++keepAliveCount_;
+  }
+
+  void keepAliveRelease() override {
+    releaseAndTryFree();
+  }
+
+  void releaseAndTryFree() {
+    --keepAliveCount_;
+    if (keepAliveCount_ == 0) {
+      delete this;
+    }
+  }
+
+ private:
+  Func func_;
+  ssize_t keepAliveCount_{0};
+
+  DeferredExecutor() = default;
+};
+
 } // namespace detail
 } // namespace futures
 
index c4f19a7..49e4c02 100644 (file)
@@ -35,7 +35,8 @@ template class Future<std::string>;
 template class Future<double>;
 } // namespace folly
 
-namespace folly { namespace futures {
+namespace folly {
+namespace futures {
 
 Future<Unit> sleep(Duration dur, Timekeeper* tk) {
   std::shared_ptr<Timekeeper> tks;
index 3995747..5cd5436 100644 (file)
@@ -25,6 +25,7 @@
 
 #include <folly/Optional.h>
 #include <folly/Portability.h>
+#include <folly/ScopeGuard.h>
 #include <folly/Try.h>
 #include <folly/Utility.h>
 #include <folly/executors/DrivableExecutor.h>
@@ -95,30 +96,6 @@ class FutureBase {
   T&& value() &&;
   T const&& value() const&&;
 
-  /// Returns an inactive Future which will call back on the other side of
-  /// executor (when it is activated).
-  ///
-  /// NB remember that Futures activate when they destruct. This is good,
-  /// it means that this will work:
-  ///
-  ///   f.via(e).then(a).then(b);
-  ///
-  /// a and b will execute in the same context (the far side of e), because
-  /// the Future (temporary variable) created by via(e) does not call back
-  /// until it destructs, which is after then(a) and then(b) have been wired
-  /// up.
-  ///
-  /// But this is still racy:
-  ///
-  ///   f = f.via(e).then(a);
-  ///   f.then(b);
-  // The ref-qualifier allows for `this` to be moved out so we
-  // don't get access-after-free situations in chaining.
-  // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/
-  inline Future<T> via(
-      Executor* executor,
-      int8_t priority = Executor::MID_PRI) &&;
-
   /** True when the result (or exception) is ready. */
   bool isReady() const;
 
@@ -218,6 +195,7 @@ template <class T>
 class SemiFuture : private futures::detail::FutureBase<T> {
  private:
   using Base = futures::detail::FutureBase<T>;
+  using DeferredExecutor = futures::detail::DeferredExecutor;
 
  public:
   static SemiFuture<T> makeEmpty(); // equivalent to moved-from
@@ -262,7 +240,6 @@ class SemiFuture : private futures::detail::FutureBase<T> {
   using Base::raise;
   using Base::setCallback_;
   using Base::value;
-  using Base::via;
 
   SemiFuture& operator=(SemiFuture const&) = delete;
   SemiFuture& operator=(SemiFuture&&) noexcept;
@@ -290,11 +267,57 @@ class SemiFuture : private futures::detail::FutureBase<T> {
   /// Overload of wait(Duration) for rvalue Futures
   SemiFuture<T>&& wait(Duration) &&;
 
+  /// Returns an inactive Future which will call back on the other side of
+  /// executor (when it is activated).
+  ///
+  /// NB remember that Futures activate when they destruct. This is good,
+  /// it means that this will work:
+  ///
+  ///   f.via(e).then(a).then(b);
+  ///
+  /// a and b will execute in the same context (the far side of e), because
+  /// the Future (temporary variable) created by via(e) does not call back
+  /// until it destructs, which is after then(a) and then(b) have been wired
+  /// up.
+  ///
+  /// But this is still racy:
+  ///
+  ///   f = f.via(e).then(a);
+  ///   f.then(b);
+  // The ref-qualifier allows for `this` to be moved out so we
+  // don't get access-after-free situations in chaining.
+  // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/
+  inline Future<T> via(
+      Executor* executor,
+      int8_t priority = Executor::MID_PRI) &&;
+
+  /**
+   * Defer work to run on the consumer of the future.
+   * This work will be run eithe ron an executor that the caller sets on the
+   * SemiFuture, or inline with the call to .get().
+   * NB: This is a custom method because boost-blocking executors is a
+   * special-case for work deferral in folly. With more general boost-blocking
+   * support all executors would boost block and we would simply use some form
+   * of driveable executor here.
+   */
+  template <typename F>
+  SemiFuture<typename futures::detail::callableResult<T, F>::Return::value_type>
+  defer(F&& func) &&;
+
+  // Public as for setCallback_
+  // Ensure that a boostable executor performs work to chain deferred work
+  // cleanly
+  void boost_();
+
  private:
   template <class>
   friend class futures::detail::FutureBase;
+  template <class>
+  friend class SemiFuture;
 
   using typename Base::corePtr;
+  using Base::setExecutor;
+  using Base::throwIfInvalid;
 
   template <class T2>
   friend SemiFuture<T2> makeSemiFuture(Try<T2>&&);
@@ -374,7 +397,6 @@ class Future : private futures::detail::FutureBase<T> {
   using Base::raise;
   using Base::setCallback_;
   using Base::value;
-  using Base::via;
 
   static Future<T> makeEmpty(); // equivalent to moved-from
 
@@ -401,6 +423,30 @@ class Future : private futures::detail::FutureBase<T> {
       enable_if<isFuture<F>::value, Future<typename isFuture<T>::Inner>>::type
       unwrap();
 
+  /// Returns an inactive Future which will call back on the other side of
+  /// executor (when it is activated).
+  ///
+  /// NB remember that Futures activate when they destruct. This is good,
+  /// it means that this will work:
+  ///
+  ///   f.via(e).then(a).then(b);
+  ///
+  /// a and b will execute in the same context (the far side of e), because
+  /// the Future (temporary variable) created by via(e) does not call back
+  /// until it destructs, which is after then(a) and then(b) have been wired
+  /// up.
+  ///
+  /// But this is still racy:
+  ///
+  ///   f = f.via(e).then(a);
+  ///   f.then(b);
+  // The ref-qualifier allows for `this` to be moved out so we
+  // don't get access-after-free situations in chaining.
+  // https://akrzemi1.wordpress.com/2014/06/02/ref-qualifiers/
+  inline Future<T> via(
+      Executor* executor,
+      int8_t priority = Executor::MID_PRI) &&;
+
   /// This variant creates a new future, where the ref-qualifier && version
   /// moves `this` out. This one is less efficient but avoids confusing users
   /// when "return f.via(x);" fails.
@@ -693,7 +739,11 @@ class Future : private futures::detail::FutureBase<T> {
   friend class futures::detail::FutureBase;
   template <class>
   friend class Future;
+  template <class>
+  friend class SemiFuture;
 
+  using Base::setExecutor;
+  using Base::throwIfInvalid;
   using typename Base::corePtr;
 
   explicit Future(corePtr obj) : Base(obj) {}
index 29afb08..9531483 100644 (file)
@@ -216,3 +216,86 @@ TEST(SemiFuture, MakeFutureFromSemiFutureLValue) {
   ASSERT_EQ(future.value(), 42);
   ASSERT_EQ(result, 42);
 }
+
+TEST(SemiFuture, SimpleDefer) {
+  std::atomic<int> innerResult{0};
+  Promise<folly::Unit> p;
+  auto f = p.getFuture();
+  auto sf = std::move(f).semi().defer([&]() { innerResult = 17; });
+  p.setValue();
+  // Run "F" here inline in the calling thread
+  std::move(sf).get();
+  ASSERT_EQ(innerResult, 17);
+}
+
+TEST(SemiFuture, DeferWithVia) {
+  std::atomic<int> innerResult{0};
+  EventBase e2;
+  Promise<folly::Unit> p;
+  auto f = p.getFuture();
+  auto sf = std::move(f).semi().defer([&]() { innerResult = 17; });
+  // Run "F" here inline in the calling thread
+  auto tf = std::move(sf).via(&e2);
+  p.setValue();
+  tf.getVia(&e2);
+  ASSERT_EQ(innerResult, 17);
+}
+
+TEST(SemiFuture, ChainingDefertoThen) {
+  std::atomic<int> innerResult{0};
+  std::atomic<int> result{0};
+  EventBase e2;
+  Promise<folly::Unit> p;
+  auto f = p.getFuture();
+  auto sf = std::move(f).semi().defer([&]() { innerResult = 17; });
+  // Run "F" here inline in a task running on the eventbase
+  auto tf = std::move(sf).via(&e2).then([&]() { result = 42; });
+  p.setValue();
+  tf.getVia(&e2);
+  ASSERT_EQ(innerResult, 17);
+  ASSERT_EQ(result, 42);
+}
+
+TEST(SemiFuture, SimpleDeferWithValue) {
+  std::atomic<int> innerResult{0};
+  Promise<int> p;
+  auto f = p.getFuture();
+  auto sf = std::move(f).semi().defer([&](int a) { innerResult = a; });
+  p.setValue(7);
+  // Run "F" here inline in the calling thread
+  std::move(sf).get();
+  ASSERT_EQ(innerResult, 7);
+}
+
+TEST(SemiFuture, ChainingDefertoThenWithValue) {
+  std::atomic<int> innerResult{0};
+  std::atomic<int> result{0};
+  EventBase e2;
+  Promise<int> p;
+  auto f = p.getFuture();
+  auto sf = std::move(f).semi().defer([&](int a) {
+    innerResult = a;
+    return a;
+  });
+  // Run "F" here inline in a task running on the eventbase
+  auto tf = std::move(sf).via(&e2).then([&](int a) { result = a; });
+  p.setValue(7);
+  tf.getVia(&e2);
+  ASSERT_EQ(innerResult, 7);
+  ASSERT_EQ(result, 7);
+}
+
+TEST(SemiFuture, MakeSemiFutureFromFutureWithTry) {
+  Promise<int> p;
+  auto f = p.getFuture();
+  auto sf = std::move(f).semi().defer([&](Try<int> t) {
+    if (auto err = t.tryGetExceptionObject<std::logic_error>()) {
+      return Try<std::string>(err->what());
+    }
+    return Try<std::string>(
+        make_exception_wrapper<std::logic_error>("Exception"));
+  });
+  p.setException(make_exception_wrapper<std::logic_error>("Try"));
+  auto tryResult = std::move(sf).get();
+  ASSERT_EQ(tryResult.value(), "Try");
+}