folly::Observer
[folly.git] / folly / experimental / observer / detail / ObserverManager.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/experimental/observer/detail/ObserverManager.h>
17
18 #include <folly/MPMCQueue.h>
19 #include <folly/Singleton.h>
20
21 namespace folly {
22 namespace observer_detail {
23
24 FOLLY_TLS bool ObserverManager::inManagerThread_{false};
25 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
26     ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
27
28 namespace {
29 constexpr size_t kCurrentThreadPoolSize{4};
30 constexpr size_t kCurrentQueueSize{10 * 1024};
31 constexpr size_t kNextQueueSize{10 * 1024};
32 }
33
34 class ObserverManager::CurrentQueue {
35  public:
36   CurrentQueue() : queue_(kCurrentQueueSize) {
37     for (size_t i = 0; i < kCurrentThreadPoolSize; ++i) {
38       threads_.emplace_back([&]() {
39         ObserverManager::inManagerThread_ = true;
40
41         while (true) {
42           Function<void()> task;
43           queue_.blockingRead(task);
44
45           if (!task) {
46             return;
47           }
48
49           try {
50             task();
51           } catch (...) {
52             LOG(ERROR) << "Exception while running CurrentQueue task: "
53                        << exceptionStr(std::current_exception());
54           }
55         }
56       });
57     }
58   }
59
60   ~CurrentQueue() {
61     for (size_t i = 0; i < threads_.size(); ++i) {
62       queue_.blockingWrite(nullptr);
63     }
64
65     for (auto& thread : threads_) {
66       thread.join();
67     }
68
69     CHECK(queue_.isEmpty());
70   }
71
72   void add(Function<void()> task) {
73     if (ObserverManager::inManagerThread()) {
74       if (!queue_.write(std::move(task))) {
75         throw std::runtime_error("Too many Observers scheduled for update.");
76       }
77     } else {
78       queue_.blockingWrite(std::move(task));
79     }
80   }
81
82  private:
83   MPMCQueue<Function<void()>> queue_;
84   std::vector<std::thread> threads_;
85 };
86
87 class ObserverManager::NextQueue {
88  public:
89   explicit NextQueue(ObserverManager& manager)
90       : manager_(manager), queue_(kNextQueueSize) {
91     thread_ = std::thread([&]() {
92       Core::Ptr queueCore;
93
94       while (true) {
95         queue_.blockingRead(queueCore);
96
97         if (!queueCore) {
98           return;
99         }
100
101         std::vector<Core::Ptr> cores;
102         cores.emplace_back(std::move(queueCore));
103
104         {
105           SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
106
107           // We can't pick more tasks from the queue after we bumped the
108           // version, so we have to do this while holding the lock.
109           while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
110             if (!queueCore) {
111               return;
112             }
113             cores.emplace_back(std::move(queueCore));
114           }
115
116           ++manager_.version_;
117         }
118
119         for (auto& core : cores) {
120           manager_.scheduleRefresh(std::move(core), manager_.version_, true);
121         }
122       }
123     });
124   }
125
126   void add(Core::Ptr core) {
127     queue_.blockingWrite(std::move(core));
128   }
129
130   ~NextQueue() {
131     // Emtpy element signals thread to terminate
132     queue_.blockingWrite(nullptr);
133     thread_.join();
134   }
135
136  private:
137   ObserverManager& manager_;
138   MPMCQueue<Core::Ptr> queue_;
139   std::thread thread_;
140 };
141
142 ObserverManager::ObserverManager() {
143   currentQueue_ = make_unique<CurrentQueue>();
144   nextQueue_ = make_unique<NextQueue>(*this);
145 }
146
147 ObserverManager::~ObserverManager() {
148   // Destroy NextQueue, before the rest of this object, since it expects
149   // ObserverManager to be alive.
150   nextQueue_.release();
151   currentQueue_.release();
152 }
153
154 void ObserverManager::scheduleCurrent(Function<void()> task) {
155   currentQueue_->add(std::move(task));
156 }
157
158 void ObserverManager::scheduleNext(Core::Ptr core) {
159   nextQueue_->add(std::move(core));
160 }
161
162 struct ObserverManager::Singleton {
163   static folly::Singleton<ObserverManager> instance;
164 };
165
166 folly::Singleton<ObserverManager> ObserverManager::Singleton::instance([] {
167   return new ObserverManager();
168 });
169
170 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
171   return Singleton::instance.try_get();
172 }
173 }
174 }