Implement LoopKeepAlive for EventBase
authorAndrii Grynenko <andrii@fb.com>
Thu, 5 May 2016 20:22:42 +0000 (13:22 -0700)
committerFacebook Github Bot 6 <facebook-github-bot-6-bot@fb.com>
Thu, 5 May 2016 20:35:42 +0000 (13:35 -0700)
Summary: LoopKeepAlive can be useful to tell EventBase, that loop() shouldn't return even if there are no events registered, because some tasks will later be added via runInEventBaseThread. While at least one LoopKeepAlive is alive - EventBase::loop() behaves like EventBase::loopForever().

Reviewed By: yfeldblum

Differential Revision: D3261706

fb-gh-sync-id: d91424d3d12cae11abd72cffdbd57f136f628dae
fbshipit-source-id: d91424d3d12cae11abd72cffdbd57f136f628dae

folly/io/async/EventBase.cpp
folly/io/async/EventBase.h
folly/io/async/test/EventBaseTest.cpp

index 6f7e9a27425e264be6539998aaa73d87a55fdd28..5c0b78129e37a39e9f4da5ecbba45604b9363710 100644 (file)
@@ -319,6 +319,7 @@ bool EventBase::loopBody(int flags) {
   }
 
   while (!stop_.load(std::memory_order_acquire)) {
+    applyLoopKeepAlive();
     ++nextLoopCnt_;
 
     // Run the before loop callbacks
@@ -425,18 +426,34 @@ bool EventBase::loopBody(int flags) {
   return true;
 }
 
-void EventBase::loopForever() {
-  // Update the notification queue event to treat it as a normal (non-internal)
-  // event.  The notification queue event always remains installed, and the main
-  // loop won't exit with it installed.
-  fnRunner_->stopConsuming();
-  fnRunner_->startConsuming(this, queue_.get());
-
-  bool ret = loop();
+void EventBase::applyLoopKeepAlive() {
+  if (loopKeepAliveActive_ && loopKeepAlive_.unique()) {
+    // Restore the notification queue internal flag
+    fnRunner_->stopConsuming();
+    fnRunner_->startConsumingInternal(this, queue_.get());
+    loopKeepAliveActive_ = false;
+  } else if (!loopKeepAliveActive_ && !loopKeepAlive_.unique()) {
+    // Update the notification queue event to treat it as a normal
+    // (non-internal) event.  The notification queue event always remains
+    // installed, and the main loop won't exit with it installed.
+    fnRunner_->stopConsuming();
+    fnRunner_->startConsuming(this, queue_.get());
+    loopKeepAliveActive_ = true;
+  }
+}
 
-  // Restore the notification queue internal flag
-  fnRunner_->stopConsuming();
-  fnRunner_->startConsumingInternal(this, queue_.get());
+void EventBase::loopForever() {
+  bool ret;
+  {
+    SCOPE_EXIT {
+      applyLoopKeepAlive();
+      loopForeverActive_ = false;
+    };
+    loopForeverActive_ = true;
+    // Make sure notification queue events are treated as normal events.
+    auto loopKeepAlive = loopKeepAlive_;
+    ret = loop();
+  }
 
   if (!ret) {
     folly::throwSystemError("error in EventBase::loopForever()");
index e678781e320e22f2e569855d5a1470c65b8252ba..5999687d4da6d17c8e49ea967be3601effa81e43 100644 (file)
@@ -586,6 +586,22 @@ class EventBase : private boost::noncopyable,
     loopOnce();
   }
 
+  using LoopKeepAlive = std::shared_ptr<void>;
+
+  /// Returns you a handle which make loop() behave like loopForever() until
+  /// destroyed. loop() will return to its original behavior only when all
+  /// loop keep-alives are released. Loop holder is safe to release only from
+  /// EventBase thread.
+  ///
+  /// May return no op LoopKeepAlive if loopForever() is already running.
+  LoopKeepAlive loopKeepAlive() {
+    if (loopForeverActive_) {
+      return nullptr;
+    } else {
+      return loopKeepAlive_;
+    }
+  }
+
  private:
   // TimeoutManager
   void attachTimeoutManager(AsyncTimeout* obj,
@@ -602,6 +618,8 @@ class EventBase : private boost::noncopyable,
     return isInEventBaseThread();
   }
 
+  void applyLoopKeepAlive();
+
   /*
    * Helper function that tells us whether we have already handled
    * some event/timeout/callback in this loop iteration.
@@ -673,6 +691,9 @@ class EventBase : private boost::noncopyable,
   // to send function requests to the EventBase thread.
   std::unique_ptr<NotificationQueue<Func>> queue_;
   std::unique_ptr<FunctionRunner> fnRunner_;
+  LoopKeepAlive loopKeepAlive_{std::make_shared<int>(42)};
+  bool loopKeepAliveActive_{false};
+  std::atomic<bool> loopForeverActive_{false};
 
   // limit for latency in microseconds (0 disables)
   int64_t maxLatency_;
index 8a456065150e25c543152b2d04d3af533fecb92a..7cb49a4dd70cbbbc7a973593ac9631ed1ef72a47 100644 (file)
@@ -1728,3 +1728,41 @@ TEST(EventBaseTest, RunCallbacksOnDestruction) {
 
   ASSERT_TRUE(ran);
 }
+
+TEST(EventBaseTest, LoopKeepAlive) {
+  EventBase evb;
+
+  bool done = false;
+  std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ] {
+    /* sleep override */ std::this_thread::sleep_for(
+        std::chrono::milliseconds(100));
+    evb.runInEventBaseThread([&] { done = true; });
+  });
+
+  evb.loop();
+
+  ASSERT_TRUE(done);
+
+  t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveInLoop) {
+  EventBase evb;
+
+  bool done = false;
+  std::thread t;
+
+  evb.runInEventBaseThread([&] {
+    t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ] {
+      /* sleep override */ std::this_thread::sleep_for(
+          std::chrono::milliseconds(100));
+      evb.runInEventBaseThread([&] { done = true; });
+    });
+  });
+
+  evb.loop();
+
+  ASSERT_TRUE(done);
+
+  t.join();
+}