Add keepAlive() mechanism
authorAndrii Grynenko <andrii@fb.com>
Thu, 9 Feb 2017 18:58:07 +0000 (10:58 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Thu, 9 Feb 2017 19:04:51 +0000 (11:04 -0800)
Summary:
EventBase and VirtualEventBase already had a loopKeepAlive() mechanism, which enabled libraries to prevent EventBase/VirtualEventBase from being destroyed until all keep-alive tokens were released.

This change adds generic keepAlive() support into folly::Executor. folly::Executors which don't support keep-alive mechanism yet, will just return a no-op KeepAlive token.

Reviewed By: yfeldblum

Differential Revision: D4516649

fbshipit-source-id: 869779621c746cb14d985aa73bc4536859914c03

folly/Executor.cpp
folly/Executor.h
folly/fibers/EventBaseLoopController-inl.h
folly/fibers/EventBaseLoopController.h
folly/io/async/EventBase.h
folly/io/async/VirtualEventBase.cpp
folly/io/async/VirtualEventBase.h
folly/io/async/test/EventBaseTest.cpp

index 91f9884..42d1f04 100644 (file)
 
 #include <stdexcept>
 
+#include <glog/logging.h>
+
 namespace folly {
 
 void Executor::addWithPriority(Func, int8_t /* priority */) {
   throw std::runtime_error(
       "addWithPriority() is not implemented for this Executor");
 }
+
+void Executor::keepAliveRelease() {
+  LOG(FATAL) << "keepAliveRelease() should not be called for folly::Executors "
+             << "which do not implement getKeepAliveToken()";
+}
 }
index 258100c..b4f1077 100644 (file)
@@ -55,6 +55,47 @@ class Executor {
   void addPtr(P fn) {
     this->add([fn]() mutable { (*fn)(); });
   }
+
+  class KeepAlive {
+   public:
+    KeepAlive() {}
+
+    void reset() {
+      executor_.reset();
+    }
+
+    explicit operator bool() const {
+      return executor_ != nullptr;
+    }
+
+   private:
+    friend class Executor;
+    explicit KeepAlive(folly::Executor* executor) : executor_(executor) {}
+
+    struct Deleter {
+      void operator()(folly::Executor* executor) {
+        executor->keepAliveRelease();
+      }
+    };
+    std::unique_ptr<folly::Executor, Deleter> executor_;
+  };
+
+  /// Returns a keep-alive token which guarantees that Executor will keep
+  /// processing tasks until the token is released. keep-alive token can only
+  /// be destroyed from within the task, scheduled to be run on an executor.
+  ///
+  /// If executor does not support keep-alive functionality - dummy token will
+  /// be returned.
+  virtual KeepAlive getKeepAliveToken() {
+    return {};
+  }
+
+ protected:
+  virtual void keepAliveRelease();
+
+  KeepAlive makeKeepAlive() {
+    return KeepAlive{this};
+  }
 };
 
 } // folly
index 09f260c..025341c 100644 (file)
@@ -72,7 +72,7 @@ inline void EventBaseLoopControllerT<EventBaseT>::cancel() {
 template <typename EventBaseT>
 inline void EventBaseLoopControllerT<EventBaseT>::runLoop() {
   if (!eventBaseKeepAlive_) {
-    eventBaseKeepAlive_ = eventBase_->loopKeepAlive();
+    eventBaseKeepAlive_ = eventBase_->getKeepAliveToken();
   }
   if (loopRunner_) {
     loopRunner_->run([&] { fm_->loopUntilNoReadyImpl(); });
index bdc6cd7..f389a45 100644 (file)
@@ -93,7 +93,7 @@ class EventBaseLoopControllerT : public LoopController {
 
   bool awaitingScheduling_{false};
   EventBaseT* eventBase_{nullptr};
-  typename EventBaseT::LoopKeepAlive eventBaseKeepAlive_;
+  Executor::KeepAlive eventBaseKeepAlive_;
   ControllerCallback callback_;
   DestructionCallback destructionCallback_;
   FiberManager* fm_{nullptr};
index cc23fed..c318d60 100644 (file)
@@ -600,33 +600,17 @@ class EventBase : private boost::noncopyable,
     loopOnce();
   }
 
-  struct LoopKeepAliveDeleter {
-    void operator()(EventBase* evb) {
-      DCHECK(evb->isInEventBaseThread());
-      evb->loopKeepAliveCount_--;
-    }
-  };
-  using LoopKeepAlive = std::unique_ptr<EventBase, LoopKeepAliveDeleter>;
-
   /// Returns you a handle which make loop() behave like loopForever() until
   /// destroyed. loop() will return to its original behavior only when all
   /// loop keep-alives are released. Loop holder is safe to release only from
   /// EventBase thread.
-  ///
-  /// May return no op LoopKeepAlive if loopForever() is already running.
-  LoopKeepAlive loopKeepAlive() {
-    DCHECK(isInEventBaseThread());
-    loopKeepAliveCount_++;
-    return LoopKeepAlive(this);
-  }
-
-  // Thread-safe version of loopKeepAlive()
-  LoopKeepAlive loopKeepAliveAtomic() {
+  KeepAlive getKeepAliveToken() override {
     if (inRunningEventBaseThread()) {
-      return loopKeepAlive();
+      loopKeepAliveCount_++;
+    } else {
+      loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
     }
-    loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
-    return LoopKeepAlive(this);
+    return makeKeepAlive();
   }
 
   // TimeoutManager
@@ -645,6 +629,12 @@ class EventBase : private boost::noncopyable,
     return isInEventBaseThread();
   }
 
+ protected:
+  void keepAliveRelease() override {
+    DCHECK(isInEventBaseThread());
+    loopKeepAliveCount_--;
+  }
+
  private:
   void applyLoopKeepAlive();
 
index cccac60..c1dbe2e 100644 (file)
@@ -18,8 +18,8 @@
 namespace folly {
 
 VirtualEventBase::VirtualEventBase(EventBase& evb) : evb_(evb) {
-  evbLoopKeepAlive_ = evb_.loopKeepAliveAtomic();
-  loopKeepAlive_ = loopKeepAliveAtomic();
+  evbLoopKeepAlive_ = evb_.getKeepAliveToken();
+  loopKeepAlive_ = getKeepAliveToken();
 }
 
 VirtualEventBase::~VirtualEventBase() {
index f3be265..f356d50 100644 (file)
@@ -27,7 +27,7 @@ namespace folly {
  *
  * Multiple VirtualEventBases can be backed by a single EventBase. Similarly
  * to EventBase, VirtualEventBase implements loopKeepAlive() functionality,
- * which allows callbacks holding LoopKeepAlive token to keep EventBase looping
+ * which allows callbacks holding KeepAlive token to keep EventBase looping
  * until they are complete.
  *
  * VirtualEventBase destructor blocks until all its KeepAliveTokens are released
@@ -75,11 +75,11 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
    */
   template <typename F>
   void runInEventBaseThread(F&& f) {
-    // LoopKeepAlive token has to be released in the EventBase thread. If
-    // runInEventBaseThread() fails, we can't extract the LoopKeepAlive token
+    // KeepAlive token has to be released in the EventBase thread. If
+    // runInEventBaseThread() fails, we can't extract the KeepAlive token
     // from the callback to properly release it.
     CHECK(evb_.runInEventBaseThread([
-      keepAlive = loopKeepAliveAtomic(),
+      keepAliveToken = getKeepAliveToken(),
       f = std::forward<F>(f)
     ]() mutable { f(); }));
   }
@@ -122,47 +122,35 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
     runInEventBaseThread(std::move(f));
   }
 
-  struct LoopKeepAliveDeleter {
-    void operator()(VirtualEventBase* evb) {
-      DCHECK(evb->getEventBase().inRunningEventBaseThread());
-      if (evb->loopKeepAliveCountAtomic_.load()) {
-        evb->loopKeepAliveCount_ += evb->loopKeepAliveCountAtomic_.exchange(0);
-      }
-      DCHECK(evb->loopKeepAliveCount_ > 0);
-      if (--evb->loopKeepAliveCount_ == 0) {
-        evb->loopKeepAliveBaton_.post();
-      }
-    }
-  };
-  using LoopKeepAlive = std::unique_ptr<VirtualEventBase, LoopKeepAliveDeleter>;
-
   /**
    * Returns you a handle which prevents VirtualEventBase from being destroyed.
-   * LoopKeepAlive handle can be released from EventBase loop only.
-   *
-   * loopKeepAlive() can be called from EventBase thread only.
+   * KeepAlive handle can be released from EventBase loop only.
    */
-  LoopKeepAlive loopKeepAlive() {
-    DCHECK(evb_.isInEventBaseThread());
-    ++loopKeepAliveCount_;
-    return LoopKeepAlive(this);
-  }
-
-  /**
-   * Thread-safe version of loopKeepAlive()
-   */
-  LoopKeepAlive loopKeepAliveAtomic() {
+  KeepAlive getKeepAliveToken() override {
     if (evb_.inRunningEventBaseThread()) {
-      return loopKeepAlive();
+      ++loopKeepAliveCount_;
+    } else {
+      ++loopKeepAliveCountAtomic_;
     }
-    ++loopKeepAliveCountAtomic_;
-    return LoopKeepAlive(this);
+    return makeKeepAlive();
   }
 
   bool inRunningEventBaseThread() const {
     return evb_.inRunningEventBaseThread();
   }
 
+ protected:
+  void keepAliveRelease() override {
+    DCHECK(getEventBase().inRunningEventBaseThread());
+    if (loopKeepAliveCountAtomic_.load()) {
+      loopKeepAliveCount_ += loopKeepAliveCountAtomic_.exchange(0);
+    }
+    DCHECK(loopKeepAliveCount_ > 0);
+    if (--loopKeepAliveCount_ == 0) {
+      loopKeepAliveBaton_.post();
+    }
+  }
+
  private:
   using LoopCallbackList = EventBase::LoopCallback::List;
 
@@ -171,9 +159,9 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
   ssize_t loopKeepAliveCount_{0};
   std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
   folly::Baton<> loopKeepAliveBaton_;
-  LoopKeepAlive loopKeepAlive_;
+  KeepAlive loopKeepAlive_;
 
-  EventBase::LoopKeepAlive evbLoopKeepAlive_;
+  KeepAlive evbLoopKeepAlive_;
 
   folly::Synchronized<LoopCallbackList> onDestructionCallbacks_;
 };
index 454ece3..5528dc3 100644 (file)
@@ -1,4 +1,6 @@
 /*
+ * Copyright 2017-present Facebook, Inc.
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements. See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +18,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 #include <folly/Memory.h>
 #include <folly/ScopeGuard.h>
 
@@ -1736,7 +1737,7 @@ TEST(EventBaseTest, LoopKeepAlive) {
   EventBase evb;
 
   bool done = false;
-  std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
+  std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
     /* sleep override */ std::this_thread::sleep_for(
         std::chrono::milliseconds(100));
     evb.runInEventBaseThread(
@@ -1757,7 +1758,7 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) {
   std::thread t;
 
   evb.runInEventBaseThread([&] {
-    t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
+    t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
       /* sleep override */ std::this_thread::sleep_for(
           std::chrono::milliseconds(100));
       evb.runInEventBaseThread(
@@ -1785,9 +1786,9 @@ TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
 
   {
     auto* ev = evb.get();
-    EventBase::LoopKeepAlive keepAlive;
+    Executor::KeepAlive keepAlive;
     ev->runInEventBaseThreadAndWait(
-        [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
+        [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
     ASSERT_FALSE(done) << "Loop finished before we asked it to";
     ev->terminateLoopSoon();
     /* sleep override */
@@ -1807,7 +1808,7 @@ TEST(EventBaseTest, LoopKeepAliveShutdown) {
 
   std::thread t([
     &done,
-    loopKeepAlive = evb->loopKeepAlive(),
+    loopKeepAlive = evb->getKeepAliveToken(),
     evbPtr = evb.get()
   ]() mutable {
     /* sleep override */ std::this_thread::sleep_for(
@@ -1839,9 +1840,9 @@ TEST(EventBaseTest, LoopKeepAliveAtomic) {
 
   for (size_t i = 0; i < kNumThreads; ++i) {
     ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
-      std::vector<EventBase::LoopKeepAlive> keepAlives;
+      std::vector<Executor::KeepAlive> keepAlives;
       for (size_t j = 0; j < kNumTasks; ++j) {
-        keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic());
+        keepAlives.emplace_back(evbPtr->getKeepAliveToken());
       }
 
       batonPtr->post();