Revert D4618623: Give observer manager threads a name
[folly.git] / folly / experimental / observer / detail / ObserverManager.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/experimental/observer/detail/ObserverManager.h>
17
18 #include <folly/ExceptionString.h>
19 #include <folly/MPMCQueue.h>
20 #include <folly/Singleton.h>
21 #include <folly/portability/GFlags.h>
22
23 namespace folly {
24 namespace observer_detail {
25
26 FOLLY_TLS bool ObserverManager::inManagerThread_{false};
27 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
28     ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
29
30 DEFINE_int32(
31     observer_manager_pool_size,
32     4,
33     "How many internal threads ObserverManager should use");
34
35 namespace {
36 constexpr size_t kCurrentQueueSize{10 * 1024};
37 constexpr size_t kNextQueueSize{10 * 1024};
38 }
39
40 class ObserverManager::CurrentQueue {
41  public:
42   CurrentQueue() : queue_(kCurrentQueueSize) {
43     if (FLAGS_observer_manager_pool_size < 1) {
44       LOG(ERROR) << "--observer_manager_pool_size should be >= 1";
45       FLAGS_observer_manager_pool_size = 1;
46     }
47     for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
48       threads_.emplace_back([&]() {
49         ObserverManager::inManagerThread_ = true;
50
51         while (true) {
52           Function<void()> task;
53           queue_.blockingRead(task);
54
55           if (!task) {
56             return;
57           }
58
59           try {
60             task();
61           } catch (...) {
62             LOG(ERROR) << "Exception while running CurrentQueue task: "
63                        << exceptionStr(std::current_exception());
64           }
65         }
66       });
67     }
68   }
69
70   ~CurrentQueue() {
71     for (size_t i = 0; i < threads_.size(); ++i) {
72       queue_.blockingWrite(nullptr);
73     }
74
75     for (auto& thread : threads_) {
76       thread.join();
77     }
78
79     CHECK(queue_.isEmpty());
80   }
81
82   void add(Function<void()> task) {
83     if (ObserverManager::inManagerThread()) {
84       if (!queue_.write(std::move(task))) {
85         throw std::runtime_error("Too many Observers scheduled for update.");
86       }
87     } else {
88       queue_.blockingWrite(std::move(task));
89     }
90   }
91
92  private:
93   MPMCQueue<Function<void()>> queue_;
94   std::vector<std::thread> threads_;
95 };
96
97 class ObserverManager::NextQueue {
98  public:
99   explicit NextQueue(ObserverManager& manager)
100       : manager_(manager), queue_(kNextQueueSize) {
101     thread_ = std::thread([&]() {
102       Core::Ptr queueCore;
103
104       while (true) {
105         queue_.blockingRead(queueCore);
106
107         if (!queueCore) {
108           return;
109         }
110
111         std::vector<Core::Ptr> cores;
112         cores.emplace_back(std::move(queueCore));
113
114         {
115           SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
116
117           // We can't pick more tasks from the queue after we bumped the
118           // version, so we have to do this while holding the lock.
119           while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
120             if (!queueCore) {
121               return;
122             }
123             cores.emplace_back(std::move(queueCore));
124           }
125
126           ++manager_.version_;
127         }
128
129         for (auto& core : cores) {
130           manager_.scheduleRefresh(std::move(core), manager_.version_, true);
131         }
132       }
133     });
134   }
135
136   void add(Core::Ptr core) {
137     queue_.blockingWrite(std::move(core));
138   }
139
140   ~NextQueue() {
141     // Emtpy element signals thread to terminate
142     queue_.blockingWrite(nullptr);
143     thread_.join();
144   }
145
146  private:
147   ObserverManager& manager_;
148   MPMCQueue<Core::Ptr> queue_;
149   std::thread thread_;
150 };
151
152 ObserverManager::ObserverManager() {
153   currentQueue_ = make_unique<CurrentQueue>();
154   nextQueue_ = make_unique<NextQueue>(*this);
155 }
156
157 ObserverManager::~ObserverManager() {
158   // Destroy NextQueue, before the rest of this object, since it expects
159   // ObserverManager to be alive.
160   nextQueue_.reset();
161   currentQueue_.reset();
162 }
163
164 void ObserverManager::scheduleCurrent(Function<void()> task) {
165   currentQueue_->add(std::move(task));
166 }
167
168 void ObserverManager::scheduleNext(Core::Ptr core) {
169   nextQueue_->add(std::move(core));
170 }
171
172 struct ObserverManager::Singleton {
173   static folly::Singleton<ObserverManager> instance;
174   // MSVC 2015 doesn't let us access ObserverManager's constructor if we
175   // try to use a lambda to initialize instance, so we have to create
176   // an actual function instead.
177   static ObserverManager* createManager() {
178     return new ObserverManager();
179   }
180 };
181
182 folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
183     createManager);
184
185 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
186   return Singleton::instance.try_get();
187 }
188 }
189 }