folly::Observer
[folly.git] / folly / experimental / observer / detail / Core.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/experimental/observer/detail/Core.h>
17 #include <folly/experimental/observer/detail/ObserverManager.h>
18
19 namespace folly {
20 namespace observer_detail {
21
22 Core::VersionedData Core::getData() {
23   if (!ObserverManager::inManagerThread()) {
24     return data_.copy();
25   }
26
27   ObserverManager::DependencyRecorder::markDependency(shared_from_this());
28
29   auto version = ObserverManager::getVersion();
30
31   if (version_ >= version) {
32     return data_.copy();
33   }
34
35   refresh(version);
36
37   DCHECK_GE(version_, version);
38   return data_.copy();
39 }
40
41 size_t Core::refresh(size_t version, bool force) {
42   CHECK(ObserverManager::inManagerThread());
43
44   if (version_ >= version) {
45     return versionLastChange_;
46   }
47
48   bool refreshDependents = false;
49
50   {
51     std::lock_guard<std::mutex> lgRefresh(refreshMutex_);
52
53     // Recheck in case this code was already refreshed
54     if (version_ >= version) {
55       return versionLastChange_;
56     }
57
58     bool needRefresh = force || version_ == 0;
59
60     // This can be run in parallel, but we expect most updates to propagate
61     // bottom to top.
62     dependencies_.withRLock([&](const Dependencies& dependencies) {
63       for (const auto& dependency : dependencies) {
64         if (dependency->refresh(version) > version_) {
65           needRefresh = true;
66           break;
67         }
68       }
69     });
70
71     if (!needRefresh) {
72       version_ = version;
73       return versionLastChange_;
74     }
75
76     ObserverManager::DependencyRecorder dependencyRecorder;
77
78     try {
79       {
80         VersionedData newData{creator_(), version};
81         if (!newData.data) {
82           throw std::logic_error("Observer creator returned nullptr.");
83         }
84         data_.swap(newData);
85       }
86
87       versionLastChange_ = version;
88       refreshDependents = true;
89     } catch (...) {
90       LOG(ERROR) << "Exception while refreshing Observer: "
91                  << exceptionStr(std::current_exception());
92
93       if (version_ == 0) {
94         // Re-throw exception if this is the first time we run creator
95         throw;
96       }
97     }
98
99     version_ = version;
100
101     auto newDependencies = dependencyRecorder.release();
102     dependencies_.withWLock([&](Dependencies& dependencies) {
103       for (const auto& dependency : newDependencies) {
104         if (!dependencies.count(dependency)) {
105           dependency->addDependent(this->shared_from_this());
106         }
107       }
108
109       for (const auto& dependency : dependencies) {
110         if (!newDependencies.count(dependency)) {
111           dependency->removeStaleDependents();
112         }
113       }
114
115       dependencies = std::move(newDependencies);
116     });
117   }
118
119   if (refreshDependents) {
120     auto dependents = dependents_.copy();
121
122     for (const auto& dependentWeak : dependents) {
123       if (auto dependent = dependentWeak.lock()) {
124         ObserverManager::scheduleRefresh(std::move(dependent), version);
125       }
126     }
127   }
128
129   return versionLastChange_;
130 }
131
132 Core::Core(folly::Function<std::shared_ptr<const void>()> creator)
133     : creator_(std::move(creator)) {}
134
135 Core::~Core() {
136   dependencies_.withWLock([](const Dependencies& dependencies) {
137     for (const auto& dependecy : dependencies) {
138       dependecy->removeStaleDependents();
139     }
140   });
141 }
142
143 Core::Ptr Core::create(folly::Function<std::shared_ptr<const void>()> creator) {
144   auto core = Core::Ptr(new Core(std::move(creator)));
145   return core;
146 }
147
148 void Core::addDependent(Core::WeakPtr dependent) {
149   dependents_.withWLock([&](Dependents& dependents) {
150     dependents.push_back(std::move(dependent));
151   });
152 }
153
154 void Core::removeStaleDependents() {
155   // This is inefficient, the assumption is that we won't have many dependents
156   dependents_.withWLock([](Dependents& dependents) {
157     for (size_t i = 0; i < dependents.size(); ++i) {
158       if (dependents[i].expired()) {
159         std::swap(dependents[i], dependents.back());
160         dependents.pop_back();
161         --i;
162       }
163     }
164   });
165 }
166 }
167 }