Fix a race in Observable context destruction
[folly.git] / folly / experimental / observer / detail / ObserverManager.cpp
index f14b9092475f759c6b7d98203558fd6642180478..f909ef57f3da31a9d88df714e4157e1b10ba51d7 100644 (file)
 #include <folly/experimental/observer/detail/ObserverManager.h>
 
 #include <folly/ExceptionString.h>
+#include <folly/Format.h>
 #include <folly/MPMCQueue.h>
+#include <folly/Range.h>
 #include <folly/Singleton.h>
+#include <folly/ThreadName.h>
 #include <folly/portability/GFlags.h>
 
 namespace folly {
@@ -32,6 +35,8 @@ DEFINE_int32(
     4,
     "How many internal threads ObserverManager should use");
 
+static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
+
 namespace {
 constexpr size_t kCurrentQueueSize{10 * 1024};
 constexpr size_t kNextQueueSize{10 * 1024};
@@ -45,7 +50,9 @@ class ObserverManager::CurrentQueue {
       FLAGS_observer_manager_pool_size = 1;
     }
     for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
-      threads_.emplace_back([&]() {
+      threads_.emplace_back([this, i]() {
+        folly::setThreadName(
+            folly::sformat("{}{}", kObserverManagerThreadNamePrefix, i));
         ObserverManager::inManagerThread_ = true;
 
         while (true) {
@@ -99,28 +106,35 @@ class ObserverManager::NextQueue {
   explicit NextQueue(ObserverManager& manager)
       : manager_(manager), queue_(kNextQueueSize) {
     thread_ = std::thread([&]() {
-      Core::Ptr queueCore;
+      Core::WeakPtr queueCoreWeak;
 
       while (true) {
-        queue_.blockingRead(queueCore);
-
-        if (!queueCore) {
+        queue_.blockingRead(queueCoreWeak);
+        if (stop_) {
           return;
         }
 
         std::vector<Core::Ptr> cores;
-        cores.emplace_back(std::move(queueCore));
+        {
+          auto queueCore = queueCoreWeak.lock();
+          if (!queueCore) {
+            continue;
+          }
+          cores.emplace_back(std::move(queueCore));
+        }
 
         {
           SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
 
           // We can't pick more tasks from the queue after we bumped the
           // version, so we have to do this while holding the lock.
-          while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
-            if (!queueCore) {
+          while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) {
+            if (stop_) {
               return;
             }
-            cores.emplace_back(std::move(queueCore));
+            if (auto queueCore = queueCoreWeak.lock()) {
+              cores.emplace_back(std::move(queueCore));
+            }
           }
 
           ++manager_.version_;
@@ -133,20 +147,22 @@ class ObserverManager::NextQueue {
     });
   }
 
-  void add(Core::Ptr core) {
+  void add(Core::WeakPtr core) {
     queue_.blockingWrite(std::move(core));
   }
 
   ~NextQueue() {
-    // Emtpy element signals thread to terminate
-    queue_.blockingWrite(nullptr);
+    stop_ = true;
+    // Write to the queue to notify the thread.
+    queue_.blockingWrite(Core::WeakPtr());
     thread_.join();
   }
 
  private:
   ObserverManager& manager_;
-  MPMCQueue<Core::Ptr> queue_;
+  MPMCQueue<Core::WeakPtr> queue_;
   std::thread thread_;
+  std::atomic<bool> stop_{false};
 };
 
 ObserverManager::ObserverManager() {
@@ -165,7 +181,7 @@ void ObserverManager::scheduleCurrent(Function<void()> task) {
   currentQueue_->add(std::move(task));
 }
 
-void ObserverManager::scheduleNext(Core::Ptr core) {
+void ObserverManager::scheduleNext(Core::WeakPtr core) {
   nextQueue_->add(std::move(core));
 }