2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/experimental/observer/detail/Core.h>
17 #include <folly/experimental/observer/detail/ObserverManager.h>
20 namespace observer_detail {
22 Core::VersionedData Core::getData() {
23 if (!ObserverManager::inManagerThread()) {
27 ObserverManager::DependencyRecorder::markDependency(shared_from_this());
29 auto version = ObserverManager::getVersion();
31 if (version_ >= version) {
37 DCHECK_GE(version_, version);
41 size_t Core::refresh(size_t version, bool force) {
42 CHECK(ObserverManager::inManagerThread());
44 ObserverManager::DependencyRecorder::markRefreshDependency(*this);
46 ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this);
49 if (version_ >= version) {
50 return versionLastChange_;
54 std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
56 // Recheck in case this code was already refreshed
57 if (version_ >= version) {
58 return versionLastChange_;
61 bool needRefresh = force || version_ == 0;
63 ObserverManager::DependencyRecorder dependencyRecorder(*this);
65 // This can be run in parallel, but we expect most updates to propagate
67 dependencies_.withRLock([&](const Dependencies& dependencies) {
68 for (const auto& dependency : dependencies) {
70 if (dependency->refresh(version) > version_) {
75 LOG(ERROR) << "Exception while checking dependencies for updates: "
76 << exceptionStr(std::current_exception());
86 return versionLastChange_;
91 VersionedData newData{creator_(), version};
93 throw std::logic_error("Observer creator returned nullptr.");
98 versionLastChange_ = version;
100 LOG(ERROR) << "Exception while refreshing Observer: "
101 << exceptionStr(std::current_exception());
104 // Re-throw exception if this is the first time we run creator
111 if (versionLastChange_ != version) {
112 return versionLastChange_;
115 auto newDependencies = dependencyRecorder.release();
116 dependencies_.withWLock([&](Dependencies& dependencies) {
117 for (const auto& dependency : newDependencies) {
118 if (!dependencies.count(dependency)) {
119 dependency->addDependent(this->shared_from_this());
123 for (const auto& dependency : dependencies) {
124 if (!newDependencies.count(dependency)) {
125 dependency->removeStaleDependents();
129 dependencies = std::move(newDependencies);
133 auto dependents = dependents_.copy();
135 for (const auto& dependentWeak : dependents) {
136 if (auto dependent = dependentWeak.lock()) {
137 ObserverManager::scheduleRefresh(std::move(dependent), version);
141 return versionLastChange_;
144 Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
145 : creator_(std::move(creator)) {}
148 dependencies_.withWLock([](const Dependencies& dependencies) {
149 for (const auto& dependecy : dependencies) {
150 dependecy->removeStaleDependents();
155 Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
156 auto core = Core::Ptr(new Core(std::move(creator)));
160 void Core::addDependent(Core::WeakPtr dependent) {
161 dependents_.withWLock([&](Dependents& dependents) {
162 dependents.push_back(std::move(dependent));
166 void Core::removeStaleDependents() {
167 // This is inefficient, the assumption is that we won't have many dependents
168 dependents_.withWLock([](Dependents& dependents) {
169 for (size_t i = 0; i < dependents.size(); ++i) {
170 if (dependents[i].expired()) {
171 std::swap(dependents[i], dependents.back());
172 dependents.pop_back();