Fix copyright lines
[folly.git] / folly / experimental / observer / detail / Core.cpp
1 /*
2  * Copyright 2016-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/observer/detail/Core.h>
18
19 #include <folly/ExceptionString.h>
20 #include <folly/experimental/observer/detail/ObserverManager.h>
21
22 namespace folly {
23 namespace observer_detail {
24
25 Core::VersionedData Core::getData() {
26   if (!ObserverManager::inManagerThread()) {
27     return data_.copy();
28   }
29
30   ObserverManager::DependencyRecorder::markDependency(shared_from_this());
31
32   auto version = ObserverManager::getVersion();
33
34   if (version_ >= version) {
35     return data_.copy();
36   }
37
38   refresh(version);
39
40   DCHECK_GE(version_, version);
41   return data_.copy();
42 }
43
44 size_t Core::refresh(size_t version, bool force) {
45   CHECK(ObserverManager::inManagerThread());
46
47   ObserverManager::DependencyRecorder::markRefreshDependency(*this);
48   SCOPE_EXIT {
49     ObserverManager::DependencyRecorder::unmarkRefreshDependency(*this);
50   };
51
52   if (version_ >= version) {
53     return versionLastChange_;
54   }
55
56   {
57     std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
58
59     // Recheck in case this code was already refreshed
60     if (version_ >= version) {
61       return versionLastChange_;
62     }
63
64     bool needRefresh = force || version_ == 0;
65
66     ObserverManager::DependencyRecorder dependencyRecorder(*this);
67
68     // This can be run in parallel, but we expect most updates to propagate
69     // bottom to top.
70     dependencies_.withRLock([&](const Dependencies& dependencies) {
71       for (const auto& dependency : dependencies) {
72         try {
73           if (dependency->refresh(version) > version_) {
74             needRefresh = true;
75             break;
76           }
77         } catch (...) {
78           LOG(ERROR) << "Exception while checking dependencies for updates: "
79                      << exceptionStr(std::current_exception());
80
81           needRefresh = true;
82           break;
83         }
84       }
85     });
86
87     if (!needRefresh) {
88       version_ = version;
89       return versionLastChange_;
90     }
91
92     try {
93       {
94         VersionedData newData{creator_(), version};
95         if (!newData.data) {
96           throw std::logic_error("Observer creator returned nullptr.");
97         }
98         data_.swap(newData);
99       }
100
101       versionLastChange_ = version;
102     } catch (...) {
103       LOG(ERROR) << "Exception while refreshing Observer: "
104                  << exceptionStr(std::current_exception());
105
106       if (version_ == 0) {
107         // Re-throw exception if this is the first time we run creator
108         throw;
109       }
110     }
111
112     version_ = version;
113
114     if (versionLastChange_ != version) {
115       return versionLastChange_;
116     }
117
118     auto newDependencies = dependencyRecorder.release();
119     dependencies_.withWLock([&](Dependencies& dependencies) {
120       for (const auto& dependency : newDependencies) {
121         if (!dependencies.count(dependency)) {
122           dependency->addDependent(this->shared_from_this());
123         }
124       }
125
126       for (const auto& dependency : dependencies) {
127         if (!newDependencies.count(dependency)) {
128           dependency->removeStaleDependents();
129         }
130       }
131
132       dependencies = std::move(newDependencies);
133     });
134   }
135
136   auto dependents = dependents_.copy();
137
138   for (const auto& dependentWeak : dependents) {
139     if (auto dependent = dependentWeak.lock()) {
140       ObserverManager::scheduleRefresh(std::move(dependent), version);
141     }
142   }
143
144   return versionLastChange_;
145 }
146
147 Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
148     : creator_(std::move(creator)) {}
149
150 Core::~Core() {
151   dependencies_.withWLock([](const Dependencies& dependencies) {
152     for (const auto& dependecy : dependencies) {
153       dependecy->removeStaleDependents();
154     }
155   });
156 }
157
158 Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
159   auto core = Core::Ptr(new Core(std::move(creator)));
160   return core;
161 }
162
163 void Core::addDependent(Core::WeakPtr dependent) {
164   dependents_.withWLock([&](Dependents& dependents) {
165     dependents.push_back(std::move(dependent));
166   });
167 }
168
169 void Core::removeStaleDependents() {
170   // This is inefficient, the assumption is that we won't have many dependents
171   dependents_.withWLock([](Dependents& dependents) {
172     for (size_t i = 0; i < dependents.size(); ++i) {
173       if (dependents[i].expired()) {
174         std::swap(dependents[i], dependents.back());
175         dependents.pop_back();
176         --i;
177       }
178     }
179   });
180 }
181 } // namespace observer_detail
182 } // namespace folly