Cycle detection
[folly.git] / folly / experimental / observer / detail / Core.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/experimental/observer/detail/Core.h>
17 #include <folly/experimental/observer/detail/ObserverManager.h>
18
19 namespace folly {
20 namespace observer_detail {
21
22 Core::VersionedData Core::getData() {
23   if (!ObserverManager::inManagerThread()) {
24     return data_.copy();
25   }
26
27   ObserverManager::DependencyRecorder::markDependency(shared_from_this());
28
29   auto version = ObserverManager::getVersion();
30
31   if (version_ >= version) {
32     return data_.copy();
33   }
34
35   refresh(version);
36
37   DCHECK_GE(version_, version);
38   return data_.copy();
39 }
40
41 size_t Core::refresh(size_t version, bool force) {
42   CHECK(ObserverManager::inManagerThread());
43
44   ObserverManager::DependencyRecorder::markRefreshDependency(*this);
45   SCOPE_EXIT {
46     ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this);
47   };
48
49   if (version_ >= version) {
50     return versionLastChange_;
51   }
52
53   {
54     std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
55
56     // Recheck in case this code was already refreshed
57     if (version_ >= version) {
58       return versionLastChange_;
59     }
60
61     bool needRefresh = force || version_ == 0;
62
63     ObserverManager::DependencyRecorder dependencyRecorder(*this);
64
65     // This can be run in parallel, but we expect most updates to propagate
66     // bottom to top.
67     dependencies_.withRLock([&](const Dependencies& dependencies) {
68       for (const auto& dependency : dependencies) {
69         try {
70           if (dependency->refresh(version) > version_) {
71             needRefresh = true;
72             break;
73           }
74         } catch (...) {
75           LOG(ERROR) << "Exception while checking dependencies for updates: "
76                      << exceptionStr(std::current_exception());
77
78           needRefresh = true;
79           break;
80         }
81       }
82     });
83
84     if (!needRefresh) {
85       version_ = version;
86       return versionLastChange_;
87     }
88
89     try {
90       {
91         VersionedData newData{creator_(), version};
92         if (!newData.data) {
93           throw std::logic_error("Observer creator returned nullptr.");
94         }
95         data_.swap(newData);
96       }
97
98       versionLastChange_ = version;
99     } catch (...) {
100       LOG(ERROR) << "Exception while refreshing Observer: "
101                  << exceptionStr(std::current_exception());
102
103       if (version_ == 0) {
104         // Re-throw exception if this is the first time we run creator
105         throw;
106       }
107     }
108
109     version_ = version;
110
111     if (versionLastChange_ != version) {
112       return versionLastChange_;
113     }
114
115     auto newDependencies = dependencyRecorder.release();
116     dependencies_.withWLock([&](Dependencies& dependencies) {
117       for (const auto& dependency : newDependencies) {
118         if (!dependencies.count(dependency)) {
119           dependency->addDependent(this->shared_from_this());
120         }
121       }
122
123       for (const auto& dependency : dependencies) {
124         if (!newDependencies.count(dependency)) {
125           dependency->removeStaleDependents();
126         }
127       }
128
129       dependencies = std::move(newDependencies);
130     });
131   }
132
133   auto dependents = dependents_.copy();
134
135   for (const auto& dependentWeak : dependents) {
136     if (auto dependent = dependentWeak.lock()) {
137       ObserverManager::scheduleRefresh(std::move(dependent), version);
138     }
139   }
140
141   return versionLastChange_;
142 }
143
144 Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
145     : creator_(std::move(creator)) {}
146
147 Core::~Core() {
148   dependencies_.withWLock([](const Dependencies& dependencies) {
149     for (const auto& dependecy : dependencies) {
150       dependecy->removeStaleDependents();
151     }
152   });
153 }
154
155 Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
156   auto core = Core::Ptr(new Core(std::move(creator)));
157   return core;
158 }
159
160 void Core::addDependent(Core::WeakPtr dependent) {
161   dependents_.withWLock([&](Dependents& dependents) {
162     dependents.push_back(std::move(dependent));
163   });
164 }
165
166 void Core::removeStaleDependents() {
167   // This is inefficient, the assumption is that we won't have many dependents
168   dependents_.withWLock([](Dependents& dependents) {
169     for (size_t i = 0; i < dependents.size(); ++i) {
170       if (dependents[i].expired()) {
171         std::swap(dependents[i], dependents.back());
172         dependents.pop_back();
173         --i;
174       }
175     }
176   });
177 }
178 }
179 }