e9312db566be4745619059a79b3a0488abd7a1ec
[folly.git] / folly / experimental / observer / detail / ObserverManager.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/experimental/observer/detail/ObserverManager.h>
17
18 #include <folly/ExceptionString.h>
19 #include <folly/Format.h>
20 #include <folly/MPMCQueue.h>
21 #include <folly/Range.h>
22 #include <folly/Singleton.h>
23 #include <folly/portability/GFlags.h>
24 #include <folly/system/ThreadName.h>
25
26 namespace folly {
27 namespace observer_detail {
28
29 FOLLY_TLS bool ObserverManager::inManagerThread_{false};
30 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
31     ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
32
33 DEFINE_int32(
34     observer_manager_pool_size,
35     4,
36     "How many internal threads ObserverManager should use");
37
38 static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
39
40 namespace {
41 constexpr size_t kCurrentQueueSize{10 * 1024};
42 constexpr size_t kNextQueueSize{10 * 1024};
43 }
44
45 class ObserverManager::CurrentQueue {
46  public:
47   CurrentQueue() : queue_(kCurrentQueueSize) {
48     if (FLAGS_observer_manager_pool_size < 1) {
49       LOG(ERROR) << "--observer_manager_pool_size should be >= 1";
50       FLAGS_observer_manager_pool_size = 1;
51     }
52     for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
53       threads_.emplace_back([this, i]() {
54         folly::setThreadName(
55             folly::sformat("{}{}", kObserverManagerThreadNamePrefix, i));
56         ObserverManager::inManagerThread_ = true;
57
58         while (true) {
59           Function<void()> task;
60           queue_.blockingRead(task);
61
62           if (!task) {
63             return;
64           }
65
66           try {
67             task();
68           } catch (...) {
69             LOG(ERROR) << "Exception while running CurrentQueue task: "
70                        << exceptionStr(std::current_exception());
71           }
72         }
73       });
74     }
75   }
76
77   ~CurrentQueue() {
78     for (size_t i = 0; i < threads_.size(); ++i) {
79       queue_.blockingWrite(nullptr);
80     }
81
82     for (auto& thread : threads_) {
83       thread.join();
84     }
85
86     CHECK(queue_.isEmpty());
87   }
88
89   void add(Function<void()> task) {
90     if (ObserverManager::inManagerThread()) {
91       if (!queue_.write(std::move(task))) {
92         throw std::runtime_error("Too many Observers scheduled for update.");
93       }
94     } else {
95       queue_.blockingWrite(std::move(task));
96     }
97   }
98
99  private:
100   MPMCQueue<Function<void()>> queue_;
101   std::vector<std::thread> threads_;
102 };
103
104 class ObserverManager::NextQueue {
105  public:
106   explicit NextQueue(ObserverManager& manager)
107       : manager_(manager), queue_(kNextQueueSize) {
108     thread_ = std::thread([&]() {
109       Core::WeakPtr queueCoreWeak;
110
111       while (true) {
112         queue_.blockingRead(queueCoreWeak);
113         if (stop_) {
114           return;
115         }
116
117         std::vector<Core::Ptr> cores;
118         {
119           auto queueCore = queueCoreWeak.lock();
120           if (!queueCore) {
121             continue;
122           }
123           cores.emplace_back(std::move(queueCore));
124         }
125
126         {
127           SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
128
129           // We can't pick more tasks from the queue after we bumped the
130           // version, so we have to do this while holding the lock.
131           while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) {
132             if (stop_) {
133               return;
134             }
135             if (auto queueCore = queueCoreWeak.lock()) {
136               cores.emplace_back(std::move(queueCore));
137             }
138           }
139
140           ++manager_.version_;
141         }
142
143         for (auto& core : cores) {
144           manager_.scheduleRefresh(std::move(core), manager_.version_, true);
145         }
146       }
147     });
148   }
149
150   void add(Core::WeakPtr core) {
151     queue_.blockingWrite(std::move(core));
152   }
153
154   ~NextQueue() {
155     stop_ = true;
156     // Write to the queue to notify the thread.
157     queue_.blockingWrite(Core::WeakPtr());
158     thread_.join();
159   }
160
161  private:
162   ObserverManager& manager_;
163   MPMCQueue<Core::WeakPtr> queue_;
164   std::thread thread_;
165   std::atomic<bool> stop_{false};
166 };
167
168 ObserverManager::ObserverManager() {
169   currentQueue_ = std::make_unique<CurrentQueue>();
170   nextQueue_ = std::make_unique<NextQueue>(*this);
171 }
172
173 ObserverManager::~ObserverManager() {
174   // Destroy NextQueue, before the rest of this object, since it expects
175   // ObserverManager to be alive.
176   nextQueue_.reset();
177   currentQueue_.reset();
178 }
179
180 void ObserverManager::scheduleCurrent(Function<void()> task) {
181   currentQueue_->add(std::move(task));
182 }
183
184 void ObserverManager::scheduleNext(Core::WeakPtr core) {
185   nextQueue_->add(std::move(core));
186 }
187
188 struct ObserverManager::Singleton {
189   static folly::Singleton<ObserverManager> instance;
190   // MSVC 2015 doesn't let us access ObserverManager's constructor if we
191   // try to use a lambda to initialize instance, so we have to create
192   // an actual function instead.
193   static ObserverManager* createManager() {
194     return new ObserverManager();
195   }
196 };
197
198 folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
199     createManager);
200
201 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
202   return Singleton::instance.try_get();
203 }
204 }
205 }