Cycle detection
authorAndrii Grynenko <andrii@fb.com>
Wed, 24 Aug 2016 18:24:33 +0000 (11:24 -0700)
committerFacebook Github Bot 4 <facebook-github-bot-4-bot@fb.com>
Wed, 24 Aug 2016 18:39:10 +0000 (11:39 -0700)
Summary:
1. This implements a GraphCycleDetector which can check if newly added edge belongs to a cycle in a directed graph. GraphCycleDetector is used to detect cycles between Observers when creator function is run.
2. This also fixes a bug where new dependencies could be saved even if Observer creator failed.

Reviewed By: yfeldblum

Differential Revision: D3746743

fbshipit-source-id: 99d10446c56fa4d8f7485f38309e8a282cd21bdf

folly/Makefile.am
folly/experimental/observer/detail/Core.cpp
folly/experimental/observer/detail/GraphCycleDetector.h [new file with mode: 0644]
folly/experimental/observer/detail/ObserverManager.h
folly/experimental/observer/test/ObserverTest.cpp

index a51285f4157e0484fac57c3e7866676cc21c1aaa..fb3b5582abdda98b8fe87f3f7c386c179cff7783 100644 (file)
@@ -107,6 +107,7 @@ nobase_follyinclude_HEADERS = \
        experimental/LockFreeRingBuffer.h \
        experimental/NestedCommandLineApp.h \
        experimental/observer/detail/Core.h \
+       experimental/observer/detail/GraphCycleDetector.h \
        experimental/observer/detail/ObserverManager.h \
        experimental/observer/detail/Observer-pre.h \
        experimental/observer/Observable.h \
index c4e317ec65eaaafb5693c706ed339d76165e728c..a0372bffd62e6a524fe2c7ede3547d7a97513aba 100644 (file)
@@ -41,12 +41,15 @@ Core::VersionedData Core::getData() {
 size_t Core::refresh(size_t version, bool force) {
   CHECK(ObserverManager::inManagerThread());
 
+  ObserverManager::DependencyRecorder::markRefreshDependency(*this);
+  SCOPE_EXIT {
+    ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this);
+  };
+
   if (version_ >= version) {
     return versionLastChange_;
   }
 
-  bool refreshDependents = false;
-
   {
     std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
 
@@ -57,11 +60,21 @@ size_t Core::refresh(size_t version, bool force) {
 
     bool needRefresh = force || version_ == 0;
 
+    ObserverManager::DependencyRecorder dependencyRecorder(*this);
+
     // This can be run in parallel, but we expect most updates to propagate
     // bottom to top.
     dependencies_.withRLock([&](const Dependencies& dependencies) {
       for (const auto& dependency : dependencies) {
-        if (dependency->refresh(version) > version_) {
+        try {
+          if (dependency->refresh(version) > version_) {
+            needRefresh = true;
+            break;
+          }
+        } catch (...) {
+          LOG(ERROR) << "Exception while checking dependencies for updates: "
+                     << exceptionStr(std::current_exception());
+
           needRefresh = true;
           break;
         }
@@ -73,8 +86,6 @@ size_t Core::refresh(size_t version, bool force) {
       return versionLastChange_;
     }
 
-    ObserverManager::DependencyRecorder dependencyRecorder;
-
     try {
       {
         VersionedData newData{creator_(), version};
@@ -85,7 +96,6 @@ size_t Core::refresh(size_t version, bool force) {
       }
 
       versionLastChange_ = version;
-      refreshDependents = true;
     } catch (...) {
       LOG(ERROR) << "Exception while refreshing Observer: "
                  << exceptionStr(std::current_exception());
@@ -98,6 +108,10 @@ size_t Core::refresh(size_t version, bool force) {
 
     version_ = version;
 
+    if (versionLastChange_ != version) {
+      return versionLastChange_;
+    }
+
     auto newDependencies = dependencyRecorder.release();
     dependencies_.withWLock([&](Dependencies& dependencies) {
       for (const auto& dependency : newDependencies) {
@@ -116,13 +130,11 @@ size_t Core::refresh(size_t version, bool force) {
     });
   }
 
-  if (refreshDependents) {
-    auto dependents = dependents_.copy();
+  auto dependents = dependents_.copy();
 
-    for (const auto& dependentWeak : dependents) {
-      if (auto dependent = dependentWeak.lock()) {
-        ObserverManager::scheduleRefresh(std::move(dependent), version);
-      }
+  for (const auto& dependentWeak : dependents) {
+    if (auto dependent = dependentWeak.lock()) {
+      ObserverManager::scheduleRefresh(std::move(dependent), version);
     }
   }
 
diff --git a/folly/experimental/observer/detail/GraphCycleDetector.h b/folly/experimental/observer/detail/GraphCycleDetector.h
new file mode 100644 (file)
index 0000000..5b0945a
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <unordered_map>
+#include <unordered_set>
+
+namespace folly {
+namespace observer_detail {
+
+template <typename NodeId>
+class GraphCycleDetector {
+  using NodeSet = std::unordered_set<NodeId>;
+
+ public:
+  /**
+   * Add new edge. If edge creates a cycle then it's not added and false is
+   * returned.
+   */
+  bool addEdge(NodeId from, NodeId to) {
+    // In general case DFS may be expensive here, but in most cases to-node will
+    // have no edges, so it should be O(1).
+    NodeSet visitedNodes;
+    dfs(visitedNodes, to);
+    if (visitedNodes.count(from)) {
+      return false;
+    }
+
+    auto& nodes = edges_[from];
+    DCHECK_EQ(0, nodes.count(to));
+    nodes.insert(to);
+
+    return true;
+  }
+
+  void removeEdge(NodeId from, NodeId to) {
+    auto& nodes = edges_[from];
+    DCHECK(nodes.count(to));
+    nodes.erase(to);
+    if (nodes.empty()) {
+      edges_.erase(from);
+    }
+  }
+
+ private:
+  void dfs(NodeSet& visitedNodes, NodeId node) {
+    // We don't terminate early if cycle is detected, because this is considered
+    // an error condition, so not worth optimizing for.
+    if (visitedNodes.count(node)) {
+      return;
+    }
+
+    visitedNodes.insert(node);
+
+    if (!edges_.count(node)) {
+      return;
+    }
+
+    for (const auto& to : edges_[node]) {
+      dfs(visitedNodes, to);
+    }
+  }
+
+  std::unordered_map<NodeId, NodeSet> edges_;
+};
+}
+}
index 6319bedb6128f707b9a0fbb71f2f78f3400e6804..49ceb67c11d0d36dc27a29632d283aa3b45e4718 100644 (file)
@@ -16,6 +16,7 @@
 #pragma once
 
 #include <folly/experimental/observer/detail/Core.h>
+#include <folly/experimental/observer/detail/GraphCycleDetector.h>
 #include <folly/futures/Future.h>
 
 namespace folly {
@@ -109,9 +110,15 @@ class ObserverManager {
 
   class DependencyRecorder {
    public:
-    using Dependencies = std::unordered_set<Core::Ptr>;
+    using DependencySet = std::unordered_set<Core::Ptr>;
+    struct Dependencies {
+      explicit Dependencies(const Core& core_) : core(core_) {}
 
-    DependencyRecorder() {
+      DependencySet dependencies;
+      const Core& core;
+    };
+
+    explicit DependencyRecorder(const Core& core) : dependencies_(core) {
       DCHECK(inManagerThread());
 
       previousDepedencies_ = currentDependencies_;
@@ -122,19 +129,47 @@ class ObserverManager {
       DCHECK(inManagerThread());
       DCHECK(currentDependencies_);
 
-      currentDependencies_->insert(std::move(dependency));
+      currentDependencies_->dependencies.insert(std::move(dependency));
+    }
+
+    static void markRefreshDependency(const Core& core) {
+      if (!currentDependencies_) {
+        return;
+      }
+
+      if (auto instance = getInstance()) {
+        instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
+          bool hasCycle =
+              !cycleDetector.addEdge(&currentDependencies_->core, &core);
+          if (hasCycle) {
+            throw std::logic_error("Observer cycle detected.");
+          }
+        });
+      }
+    }
+
+    static void unmarkRefreshDependency(const Core& core) {
+      if (!currentDependencies_) {
+        return;
+      }
+
+      if (auto instance = getInstance()) {
+        instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
+          cycleDetector.removeEdge(&currentDependencies_->core, &core);
+        });
+      }
     }
 
-    Dependencies release() {
+    DependencySet release() {
       DCHECK(currentDependencies_ == &dependencies_);
       std::swap(currentDependencies_, previousDepedencies_);
       previousDepedencies_ = nullptr;
 
-      return std::move(dependencies_);
+      return std::move(dependencies_.dependencies);
     }
 
     ~DependencyRecorder() {
-      if (previousDepedencies_) {
+      if (currentDependencies_ == &dependencies_) {
         release();
       }
     }
@@ -176,6 +211,9 @@ class ObserverManager {
    */
   SharedMutexReadPriority versionMutex_;
   std::atomic<size_t> version_{1};
+
+  using CycleDetector = GraphCycleDetector<const Core*>;
+  folly::Synchronized<CycleDetector, std::mutex> cycleDetector_;
 };
 }
 }
index abfbc368491b53205713219e54c21b2ddbe67c6f..6ace4203ae03973860ab2a46ab02078344e2bded 100644 (file)
@@ -148,6 +148,64 @@ TEST(Observer, NullValue) {
   EXPECT_EQ(46, **oddObserver);
 }
 
+TEST(Observer, Cycle) {
+  SimpleObservable<int> observable(0);
+  auto observer = observable.getObserver();
+  folly::Optional<Observer<int>> observerB;
+
+  auto observerA = makeObserver([observer, &observerB]() {
+    auto value = **observer;
+    if (value == 1) {
+      **observerB;
+    }
+    return value;
+  });
+
+  observerB = makeObserver([observerA]() { return **observerA; });
+
+  auto collectObserver = makeObserver([observer, observerA, &observerB]() {
+    auto value = **observer;
+    auto valueA = **observerA;
+    auto valueB = ***observerB;
+
+    if (value == 1) {
+      if (valueA == 0) {
+        EXPECT_EQ(0, valueB);
+      } else {
+        EXPECT_EQ(1, valueA);
+        EXPECT_EQ(0, valueB);
+      }
+    } else if (value == 2) {
+      EXPECT_EQ(value, valueA);
+      EXPECT_TRUE(valueB == 0 || valueB == 2);
+    } else {
+      EXPECT_EQ(value, valueA);
+      EXPECT_EQ(value, valueB);
+    }
+
+    return value;
+  });
+
+  folly::Baton<> baton;
+  auto waitingObserver = makeObserver([collectObserver, &baton]() {
+    *collectObserver;
+    baton.post();
+    return folly::Unit();
+  });
+
+  baton.reset();
+  EXPECT_EQ(0, **collectObserver);
+
+  for (size_t i = 1; i <= 3; ++i) {
+    observable.setValue(i);
+
+    EXPECT_TRUE(baton.timed_wait(std::chrono::seconds{1}));
+    baton.reset();
+
+    EXPECT_EQ(i, **collectObserver);
+  }
+}
+
 TEST(Observer, Stress) {
   SimpleObservable<int> observable(0);