folly::Observer
[folly.git] / folly / experimental / observer / detail / Core.cpp
diff --git a/folly/experimental/observer/detail/Core.cpp b/folly/experimental/observer/detail/Core.cpp
new file mode 100644 (file)
index 0000000..c4e317e
--- /dev/null
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/observer/detail/Core.h>
+#include <folly/experimental/observer/detail/ObserverManager.h>
+
+namespace folly {
+namespace observer_detail {
+
+Core::VersionedData Core::getData() {
+  if (!ObserverManager::inManagerThread()) {
+    return data_.copy();
+  }
+
+  ObserverManager::DependencyRecorder::markDependency(shared_from_this());
+
+  auto version = ObserverManager::getVersion();
+
+  if (version_ >= version) {
+    return data_.copy();
+  }
+
+  refresh(version);
+
+  DCHECK_GE(version_, version);
+  return data_.copy();
+}
+
+size_t Core::refresh(size_t version, bool force) {
+  CHECK(ObserverManager::inManagerThread());
+
+  if (version_ >= version) {
+    return versionLastChange_;
+  }
+
+  bool refreshDependents = false;
+
+  {
+    std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
+
+    // Recheck in case this code was already refreshed
+    if (version_ >= version) {
+      return versionLastChange_;
+    }
+
+    bool needRefresh = force || version_ == 0;
+
+    // This can be run in parallel, but we expect most updates to propagate
+    // bottom to top.
+    dependencies_.withRLock([&](const Dependencies& dependencies) {
+      for (const auto& dependency : dependencies) {
+        if (dependency->refresh(version) > version_) {
+          needRefresh = true;
+          break;
+        }
+      }
+    });
+
+    if (!needRefresh) {
+      version_ = version;
+      return versionLastChange_;
+    }
+
+    ObserverManager::DependencyRecorder dependencyRecorder;
+
+    try {
+      {
+        VersionedData newData{creator_(), version};
+        if (!newData.data) {
+          throw std::logic_error("Observer creator returned nullptr.");
+        }
+        data_.swap(newData);
+      }
+
+      versionLastChange_ = version;
+      refreshDependents = true;
+    } catch (...) {
+      LOG(ERROR) << "Exception while refreshing Observer: "
+                 << exceptionStr(std::current_exception());
+
+      if (version_ == 0) {
+        // Re-throw exception if this is the first time we run creator
+        throw;
+      }
+    }
+
+    version_ = version;
+
+    auto newDependencies = dependencyRecorder.release();
+    dependencies_.withWLock([&](Dependencies& dependencies) {
+      for (const auto& dependency : newDependencies) {
+        if (!dependencies.count(dependency)) {
+          dependency->addDependent(this->shared_from_this());
+        }
+      }
+
+      for (const auto& dependency : dependencies) {
+        if (!newDependencies.count(dependency)) {
+          dependency->removeStaleDependents();
+        }
+      }
+
+      dependencies = std::move(newDependencies);
+    });
+  }
+
+  if (refreshDependents) {
+    auto dependents = dependents_.copy();
+
+    for (const auto& dependentWeak : dependents) {
+      if (auto dependent = dependentWeak.lock()) {
+        ObserverManager::scheduleRefresh(std::move(dependent), version);
+      }
+    }
+  }
+
+  return versionLastChange_;
+}
+
+Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
+    : creator_(std::move(creator)) {}
+
+Core::~Core() {
+  dependencies_.withWLock([](const Dependencies& dependencies) {
+    for (const auto& dependecy : dependencies) {
+      dependecy->removeStaleDependents();
+    }
+  });
+}
+
+Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
+  auto core = Core::Ptr(new Core(std::move(creator)));
+  return core;
+}
+
+void Core::addDependent(Core::WeakPtr dependent) {
+  dependents_.withWLock([&](Dependents& dependents) {
+    dependents.push_back(std::move(dependent));
+  });
+}
+
+void Core::removeStaleDependents() {
+  // This is inefficient, the assumption is that we won't have many dependents
+  dependents_.withWLock([](Dependents& dependents) {
+    for (size_t i = 0; i < dependents.size(); ++i) {
+      if (dependents[i].expired()) {
+        std::swap(dependents[i], dependents.back());
+        dependents.pop_back();
+        --i;
+      }
+    }
+  });
+}
+}
+}