--- /dev/null
+/*
+ * Copyright 2016 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/observer/detail/Core.h>
+#include <folly/experimental/observer/detail/ObserverManager.h>
+
+namespace folly {
+namespace observer_detail {
+
+Core::VersionedData Core::getData() {
+ if (!ObserverManager::inManagerThread()) {
+ return data_.copy();
+ }
+
+ ObserverManager::DependencyRecorder::markDependency(shared_from_this());
+
+ auto version = ObserverManager::getVersion();
+
+ if (version_ >= version) {
+ return data_.copy();
+ }
+
+ refresh(version);
+
+ DCHECK_GE(version_, version);
+ return data_.copy();
+}
+
+size_t Core::refresh(size_t version, bool force) {
+ CHECK(ObserverManager::inManagerThread());
+
+ if (version_ >= version) {
+ return versionLastChange_;
+ }
+
+ bool refreshDependents = false;
+
+ {
+ std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
+
+ // Recheck in case this code was already refreshed
+ if (version_ >= version) {
+ return versionLastChange_;
+ }
+
+ bool needRefresh = force || version_ == 0;
+
+ // This can be run in parallel, but we expect most updates to propagate
+ // bottom to top.
+ dependencies_.withRLock([&](const Dependencies& dependencies) {
+ for (const auto& dependency : dependencies) {
+ if (dependency->refresh(version) > version_) {
+ needRefresh = true;
+ break;
+ }
+ }
+ });
+
+ if (!needRefresh) {
+ version_ = version;
+ return versionLastChange_;
+ }
+
+ ObserverManager::DependencyRecorder dependencyRecorder;
+
+ try {
+ {
+ VersionedData newData{creator_(), version};
+ if (!newData.data) {
+ throw std::logic_error("Observer creator returned nullptr.");
+ }
+ data_.swap(newData);
+ }
+
+ versionLastChange_ = version;
+ refreshDependents = true;
+ } catch (...) {
+ LOG(ERROR) << "Exception while refreshing Observer: "
+ << exceptionStr(std::current_exception());
+
+ if (version_ == 0) {
+ // Re-throw exception if this is the first time we run creator
+ throw;
+ }
+ }
+
+ version_ = version;
+
+ auto newDependencies = dependencyRecorder.release();
+ dependencies_.withWLock([&](Dependencies& dependencies) {
+ for (const auto& dependency : newDependencies) {
+ if (!dependencies.count(dependency)) {
+ dependency->addDependent(this->shared_from_this());
+ }
+ }
+
+ for (const auto& dependency : dependencies) {
+ if (!newDependencies.count(dependency)) {
+ dependency->removeStaleDependents();
+ }
+ }
+
+ dependencies = std::move(newDependencies);
+ });
+ }
+
+ if (refreshDependents) {
+ auto dependents = dependents_.copy();
+
+ for (const auto& dependentWeak : dependents) {
+ if (auto dependent = dependentWeak.lock()) {
+ ObserverManager::scheduleRefresh(std::move(dependent), version);
+ }
+ }
+ }
+
+ return versionLastChange_;
+}
+
+Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
+ : creator_(std::move(creator)) {}
+
+Core::~Core() {
+ dependencies_.withWLock([](const Dependencies& dependencies) {
+ for (const auto& dependecy : dependencies) {
+ dependecy->removeStaleDependents();
+ }
+ });
+}
+
+Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
+ auto core = Core::Ptr(new Core(std::move(creator)));
+ return core;
+}
+
+void Core::addDependent(Core::WeakPtr dependent) {
+ dependents_.withWLock([&](Dependents& dependents) {
+ dependents.push_back(std::move(dependent));
+ });
+}
+
+void Core::removeStaleDependents() {
+ // This is inefficient, the assumption is that we won't have many dependents
+ dependents_.withWLock([](Dependents& dependents) {
+ for (size_t i = 0; i < dependents.size(); ++i) {
+ if (dependents[i].expired()) {
+ std::swap(dependents[i], dependents.back());
+ dependents.pop_back();
+ --i;
+ }
+ }
+ });
+}
+}
+}