Fix a race in Observable context destruction
[folly.git] / folly / experimental / observer / detail / ObserverManager.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/experimental/observer/detail/Core.h>
19 #include <folly/experimental/observer/detail/GraphCycleDetector.h>
20 #include <folly/futures/Future.h>
21
22 namespace folly {
23 namespace observer_detail {
24
25 /**
26  * ObserverManager is a singleton which controls the re-computation of all
27  * Observers. Such re-computation always happens on the thread pool owned by
28  * ObserverManager.
29  *
30  * ObserverManager has global current version. All existing Observers
31  * may have their version be less (yet to be updated) or equal (up to date)
32  * to the global current version.
33  *
34  * ObserverManager::CurrentQueue contains all of the Observers which need to be
35  * updated to the global current version. Those updates are peformed on the
36  * ObserverManager's thread pool, until the queue is empty. If some Observer is
37  * updated, all of its dependents are added to ObserverManager::CurrentQueue
38  * to be updated.
39  *
40  * If some leaf Observer (i.e. created from Observable) is updated, then current
41  * version of the ObserverManager should be bumped. All such updated leaf
42  * Observers are added to the ObserverManager::NextQueue.
43  *
44  * *Only* when ObserverManager::CurrentQueue is empty, the global current
45  * version is bumped and all updates from the ObserverManager::NextQueue are
46  * performed. If leaf Observer gets updated more then once before being picked
47  * from the ObserverManager::NextQueue, then only the last update is processed.
48  */
49 class ObserverManager {
50  public:
51   static size_t getVersion() {
52     auto instance = getInstance();
53
54     if (!instance) {
55       return 1;
56     }
57
58     return instance->version_;
59   }
60
61   static bool inManagerThread() {
62     return inManagerThread_;
63   }
64
65   static Future<Unit>
66   scheduleRefresh(Core::Ptr core, size_t minVersion, bool force = false) {
67     if (core->getVersion() >= minVersion) {
68       return makeFuture<Unit>(Unit());
69     }
70
71     auto instance = getInstance();
72
73     if (!instance) {
74       return makeFuture<Unit>(
75           std::logic_error("ObserverManager requested during shutdown"));
76     }
77
78     Promise<Unit> promise;
79     auto future = promise.getFuture();
80
81     SharedMutexReadPriority::ReadHolder rh(instance->versionMutex_);
82
83     instance->scheduleCurrent([
84       core = std::move(core),
85       promise = std::move(promise),
86       instancePtr = instance.get(),
87       rh = std::move(rh),
88       force
89     ]() mutable {
90       promise.setWith([&]() { core->refresh(instancePtr->version_, force); });
91     });
92
93     return future;
94   }
95
96   static void scheduleRefreshNewVersion(Core::WeakPtr coreWeak) {
97     auto instance = getInstance();
98
99     if (!instance) {
100       return;
101     }
102
103     instance->scheduleNext(std::move(coreWeak));
104   }
105
106   static void initCore(Core::Ptr core) {
107     DCHECK(core->getVersion() == 0);
108     scheduleRefresh(std::move(core), 1).get();
109   }
110
111   class DependencyRecorder {
112    public:
113     using DependencySet = std::unordered_set<Core::Ptr>;
114     struct Dependencies {
115       explicit Dependencies(const Core& core_) : core(core_) {}
116
117       DependencySet dependencies;
118       const Core& core;
119     };
120
121     explicit DependencyRecorder(const Core& core) : dependencies_(core) {
122       DCHECK(inManagerThread());
123
124       previousDepedencies_ = currentDependencies_;
125       currentDependencies_ = &dependencies_;
126     }
127
128     static void markDependency(Core::Ptr dependency) {
129       DCHECK(inManagerThread());
130       DCHECK(currentDependencies_);
131
132       currentDependencies_->dependencies.insert(std::move(dependency));
133     }
134
135     static void markRefreshDependency(const Core& core) {
136       if (!currentDependencies_) {
137         return;
138       }
139
140       if (auto instance = getInstance()) {
141         instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
142           bool hasCycle =
143               !cycleDetector.addEdge(&currentDependencies_->core, &core);
144           if (hasCycle) {
145             throw std::logic_error("Observer cycle detected.");
146           }
147         });
148       }
149     }
150
151     static void unmarkRefreshDependency(const Core& core) {
152       if (!currentDependencies_) {
153         return;
154       }
155
156       if (auto instance = getInstance()) {
157         instance->cycleDetector_.withLock([&](CycleDetector& cycleDetector) {
158           cycleDetector.removeEdge(&currentDependencies_->core, &core);
159         });
160       }
161     }
162
163     DependencySet release() {
164       DCHECK(currentDependencies_ == &dependencies_);
165       std::swap(currentDependencies_, previousDepedencies_);
166       previousDepedencies_ = nullptr;
167
168       return std::move(dependencies_.dependencies);
169     }
170
171     ~DependencyRecorder() {
172       if (currentDependencies_ == &dependencies_) {
173         release();
174       }
175     }
176
177    private:
178     Dependencies dependencies_;
179     Dependencies* previousDepedencies_;
180
181     static FOLLY_TLS Dependencies* currentDependencies_;
182   };
183
184   ~ObserverManager();
185
186  private:
187   ObserverManager();
188
189   struct Singleton;
190
191   void scheduleCurrent(Function<void()>);
192   void scheduleNext(Core::WeakPtr);
193
194   class CurrentQueue;
195   class NextQueue;
196
197   std::unique_ptr<CurrentQueue> currentQueue_;
198   std::unique_ptr<NextQueue> nextQueue_;
199
200   static std::shared_ptr<ObserverManager> getInstance();
201   static FOLLY_TLS bool inManagerThread_;
202
203   /**
204    * Version mutex is used to make sure all updates are processed from the
205    * CurrentQueue, before bumping the version and moving to the NextQueue.
206    *
207    * To achieve this every task added to CurrentQueue holds a reader lock.
208    * NextQueue grabs a writer lock before bumping the version, so it can only
209    * happen if CurrentQueue is empty (notice that we use read-priority shared
210    * mutex).
211    */
212   SharedMutexReadPriority versionMutex_;
213   std::atomic<size_t> version_{1};
214
215   using CycleDetector = GraphCycleDetector<const Core*>;
216   folly::Synchronized<CycleDetector, std::mutex> cycleDetector_;
217 };
218 }
219 }