2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/experimental/observer/detail/Core.h>
19 #include <folly/futures/Future.h>
22 namespace observer_detail {
25 * ObserverManager is a singleton which controls the re-computation of all
26 * Observers. Such re-computation always happens on the thread pool owned by
29 * ObserverManager has global current version. All existing Observers
30 * may have their version be less (yet to be updated) or equal (up to date)
31 * to the global current version.
33 * ObserverManager::CurrentQueue contains all of the Observers which need to be
34 * updated to the global current version. Those updates are peformed on the
35 * ObserverManager's thread pool, until the queue is empty. If some Observer is
36 * updated, all of its dependents are added to ObserverManager::CurrentQueue
39 * If some leaf Observer (i.e. created from Observable) is updated, then current
40 * version of the ObserverManager should be bumped. All such updated leaf
41 * Observers are added to the ObserverManager::NextQueue.
43 * *Only* when ObserverManager::CurrentQueue is empty, the global current
44 * version is bumped and all updates from the ObserverManager::NextQueue are
45 * performed. If leaf Observer gets updated more then once before being picked
46 * from the ObserverManager::NextQueue, then only the last update is processed.
48 class ObserverManager {
50 static size_t getVersion() {
51 auto instance = getInstance();
57 return instance->version_;
60 static bool inManagerThread() {
61 return inManagerThread_;
65 scheduleRefresh(Core::Ptr core, size_t minVersion, bool force = false) {
66 if (core->getVersion() >= minVersion) {
67 return makeFuture<Unit>(Unit());
70 auto instance = getInstance();
73 return makeFuture<Unit>(
74 std::logic_error("ObserverManager requested during shutdown"));
77 Promise<Unit> promise;
78 auto future = promise.getFuture();
80 SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
82 instance->scheduleCurrent([
83 core = std::move(core),
84 promise = std::move(promise),
85 instancePtr = instance.get(),
89 promise.setWith([&]() { core->refresh(instancePtr->version_, force); });
95 static void scheduleRefreshNewVersion(Core::Ptr core) {
96 if (core->getVersion() == 0) {
97 scheduleRefresh(std::move(core), 1).get();
101 auto instance = getInstance();
107 instance->scheduleNext(std::move(core));
110 class DependencyRecorder {
112 using Dependencies = std::unordered_set<Core::Ptr>;
114 DependencyRecorder() {
115 DCHECK(inManagerThread());
117 previousDepedencies_ = currentDependencies_;
118 currentDependencies_ = &dependencies_;
121 static void markDependency(Core::Ptr dependency) {
122 DCHECK(inManagerThread());
123 DCHECK(currentDependencies_);
125 currentDependencies_->insert(std::move(dependency));
128 Dependencies release() {
129 DCHECK(currentDependencies_ == &dependencies_);
130 std::swap(currentDependencies_, previousDepedencies_);
131 previousDepedencies_ = nullptr;
133 return std::move(dependencies_);
136 ~DependencyRecorder() {
137 if (previousDepedencies_) {
143 Dependencies dependencies_;
144 Dependencies* previousDepedencies_;
146 static FOLLY_TLS Dependencies* currentDependencies_;
156 void scheduleCurrent(Function<void()>);
157 void scheduleNext(Core::Ptr);
162 std::unique_ptr<CurrentQueue> currentQueue_;
163 std::unique_ptr<NextQueue> nextQueue_;
165 static std::shared_ptr<ObserverManager> getInstance();
166 static FOLLY_TLS bool inManagerThread_;
169 * Version mutex is used to make sure all updates are processed from the
170 * CurrentQueue, before bumping the version and moving to the NextQueue.
172 * To achieve this every task added to CurrentQueue holds a reader lock.
173 * NextQueue grabs a writer lock before bumping the version, so it can only
174 * happen if CurrentQueue is empty (notice that we use read-priority shared
177 SharedMutexReadPriority versionMutex_;
178 std::atomic<size_t> version_{1};