Fix a race in Observable context destruction
[folly.git] / folly / experimental / observer / detail / ObserverManager.h
index 6319bedb6128f707b9a0fbb71f2f78f3400e6804..cfb1e70a05eef1a04c46e2bf8c98dbc9b534ca0d 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
 #pragma once
 
 #include <folly/experimental/observer/detail/Core.h>
+#include <folly/experimental/observer/detail/GraphCycleDetector.h>
 #include <folly/futures/Future.h>
 
 namespace folly {
@@ -92,26 +93,32 @@ class ObserverManager {
     return future;
   }
 
-  static void scheduleRefreshNewVersion(Core::Ptr core) {
-    if (core->getVersion() == 0) {
-      scheduleRefresh(std::move(core), 1).get();
-      return;
-    }
-
+  static void scheduleRefreshNewVersion(Core::WeakPtr coreWeak) {
     auto instance = getInstance();
 
     if (!instance) {
       return;
     }
 
-    instance->scheduleNext(std::move(core));
+    instance->scheduleNext(std::move(coreWeak));
+  }
+
+  static void initCore(Core::Ptr core) {
+    DCHECK(core->getVersion() == 0);
+    scheduleRefresh(std::move(core), 1).get();
   }
 
   class DependencyRecorder {
    public:
-    using Dependencies = std::unordered_set<Core::Ptr>;
+    using DependencySet = std::unordered_set<Core::Ptr>;
+    struct Dependencies {
+      explicit Dependencies(const Core& core_) : core(core_) {}
 
-    DependencyRecorder() {
+      DependencySet dependencies;
+      const Core& core;
+    };
+
+    explicit DependencyRecorder(const Core& core) : dependencies_(core) {
       DCHECK(inManagerThread());
 
       previousDepedencies_ = currentDependencies_;
@@ -122,19 +129,47 @@ class ObserverManager {
       DCHECK(inManagerThread());
       DCHECK(currentDependencies_);
 
-      currentDependencies_->insert(std::move(dependency));
+      currentDependencies_->dependencies.insert(std::move(dependency));
     }
 
-    Dependencies release() {
+    static void markRefreshDependency(const Core& core) {
+      if (!currentDependencies_) {
+        return;
+      }
+
+      if (auto instance = getInstance()) {
+        instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
+          bool hasCycle =
+              !cycleDetector.addEdge(&currentDependencies_->core, &core);
+          if (hasCycle) {
+            throw std::logic_error("Observer cycle detected.");
+          }
+        });
+      }
+    }
+
+    static void unmarkRefreshDependency(const Core& core) {
+      if (!currentDependencies_) {
+        return;
+      }
+
+      if (auto instance = getInstance()) {
+        instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
+          cycleDetector.removeEdge(&currentDependencies_->core, &core);
+        });
+      }
+    }
+
+    DependencySet release() {
       DCHECK(currentDependencies_ == &dependencies_);
       std::swap(currentDependencies_, previousDepedencies_);
       previousDepedencies_ = nullptr;
 
-      return std::move(dependencies_);
+      return std::move(dependencies_.dependencies);
     }
 
     ~DependencyRecorder() {
-      if (previousDepedencies_) {
+      if (currentDependencies_ == &dependencies_) {
         release();
       }
     }
@@ -154,7 +189,7 @@ class ObserverManager {
   struct Singleton;
 
   void scheduleCurrent(Function<void()>);
-  void scheduleNext(Core::Ptr);
+  void scheduleNext(Core::WeakPtr);
 
   class CurrentQueue;
   class NextQueue;
@@ -176,6 +211,9 @@ class ObserverManager {
    */
   SharedMutexReadPriority versionMutex_;
   std::atomic<size_t> version_{1};
+
+  using CycleDetector = GraphCycleDetector<const Core*>;
+  folly::Synchronized<CycleDetector, std::mutex> cycleDetector_;
 };
 }
 }