folly::Observer
[folly.git] / folly / experimental / observer / detail / ObserverManager.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/experimental/observer/detail/Core.h>
19 #include <folly/futures/Future.h>
20
21 namespace folly {
22 namespace observer_detail {
23
24 /**
25  * ObserverManager is a singleton which controls the re-computation of all
26  * Observers. Such re-computation always happens on the thread pool owned by
27  * ObserverManager.
28  *
29  * ObserverManager has global current version. All existing Observers
30  * may have their version be less (yet to be updated) or equal (up to date)
31  * to the global current version.
32  *
33  * ObserverManager::CurrentQueue contains all of the Observers which need to be
34  * updated to the global current version. Those updates are peformed on the
35  * ObserverManager's thread pool, until the queue is empty. If some Observer is
36  * updated, all of its dependents are added to ObserverManager::CurrentQueue
37  * to be updated.
38  *
39  * If some leaf Observer (i.e. created from Observable) is updated, then current
40  * version of the ObserverManager should be bumped. All such updated leaf
41  * Observers are added to the ObserverManager::NextQueue.
42  *
43  * *Only* when ObserverManager::CurrentQueue is empty, the global current
44  * version is bumped and all updates from the ObserverManager::NextQueue are
45  * performed. If leaf Observer gets updated more then once before being picked
46  * from the ObserverManager::NextQueue, then only the last update is processed.
47  */
48 class ObserverManager {
49  public:
50   static size_t getVersion() {
51     auto instance = getInstance();
52
53     if (!instance) {
54       return 1;
55     }
56
57     return instance->version_;
58   }
59
60   static bool inManagerThread() {
61     return inManagerThread_;
62   }
63
64   static Future<Unit>
65   scheduleRefresh(Core::Ptr core, size_t minVersion, bool force = false) {
66     if (core->getVersion() >= minVersion) {
67       return makeFuture<Unit>(Unit());
68     }
69
70     auto instance = getInstance();
71
72     if (!instance) {
73       return makeFuture<Unit>(
74           std::logic_error("ObserverManager requested during shutdown"));
75     }
76
77     Promise<Unit> promise;
78     auto future = promise.getFuture();
79
80     SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
81
82     instance->scheduleCurrent([
83       core = std::move(core),
84       promise = std::move(promise),
85       instancePtr = instance.get(),
86       rh = std::move(rh),
87       force
88     ]() mutable {
89       promise.setWith([&]() { core->refresh(instancePtr->version_, force); });
90     });
91
92     return future;
93   }
94
95   static void scheduleRefreshNewVersion(Core::Ptr core) {
96     if (core->getVersion() == 0) {
97       scheduleRefresh(std::move(core), 1).get();
98       return;
99     }
100
101     auto instance = getInstance();
102
103     if (!instance) {
104       return;
105     }
106
107     instance->scheduleNext(std::move(core));
108   }
109
110   class DependencyRecorder {
111    public:
112     using Dependencies = std::unordered_set<Core::Ptr>;
113
114     DependencyRecorder() {
115       DCHECK(inManagerThread());
116
117       previousDepedencies_ = currentDependencies_;
118       currentDependencies_ = &dependencies_;
119     }
120
121     static void markDependency(Core::Ptr dependency) {
122       DCHECK(inManagerThread());
123       DCHECK(currentDependencies_);
124
125       currentDependencies_->insert(std::move(dependency));
126     }
127
128     Dependencies release() {
129       DCHECK(currentDependencies_ == &dependencies_);
130       std::swap(currentDependencies_, previousDepedencies_);
131       previousDepedencies_ = nullptr;
132
133       return std::move(dependencies_);
134     }
135
136     ~DependencyRecorder() {
137       if (previousDepedencies_) {
138         release();
139       }
140     }
141
142    private:
143     Dependencies dependencies_;
144     Dependencies* previousDepedencies_;
145
146     static FOLLY_TLS Dependencies* currentDependencies_;
147   };
148
149   ~ObserverManager();
150
151  private:
152   ObserverManager();
153
154   struct Singleton;
155
156   void scheduleCurrent(Function<void()>);
157   void scheduleNext(Core::Ptr);
158
159   class CurrentQueue;
160   class NextQueue;
161
162   std::unique_ptr<CurrentQueue> currentQueue_;
163   std::unique_ptr<NextQueue> nextQueue_;
164
165   static std::shared_ptr<ObserverManager> getInstance();
166   static FOLLY_TLS bool inManagerThread_;
167
168   /**
169    * Version mutex is used to make sure all updates are processed from the
170    * CurrentQueue, before bumping the version and moving to the NextQueue.
171    *
172    * To achieve this every task added to CurrentQueue holds a reader lock.
173    * NextQueue grabs a writer lock before bumping the version, so it can only
174    * happen if CurrentQueue is empty (notice that we use read-priority shared
175    * mutex).
176    */
177   SharedMutexReadPriority versionMutex_;
178   std::atomic<size_t> version_{1};
179 };
180 }
181 }