Give observer manager threads a name
[folly.git] / folly / experimental / observer / detail / ObserverManager.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/experimental/observer/detail/ObserverManager.h>
17
18 #include <folly/ExceptionString.h>
19 #include <folly/FixedString.h>
20 #include <folly/Format.h>
21 #include <folly/MPMCQueue.h>
22 #include <folly/Singleton.h>
23 #include <folly/ThreadName.h>
24 #include <folly/portability/GFlags.h>
25
26 namespace folly {
27 namespace observer_detail {
28
29 FOLLY_TLS bool ObserverManager::inManagerThread_{false};
30 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
31     ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
32
33 DEFINE_int32(
34     observer_manager_pool_size,
35     4,
36     "How many internal threads ObserverManager should use");
37
38 static constexpr auto kObserverManagerThreadNamePrefix =
39     folly::makeFixedString("ObserverMngr");
40
41 namespace {
42 constexpr size_t kCurrentQueueSize{10 * 1024};
43 constexpr size_t kNextQueueSize{10 * 1024};
44 }
45
46 class ObserverManager::CurrentQueue {
47  public:
48   CurrentQueue() : queue_(kCurrentQueueSize) {
49     if (FLAGS_observer_manager_pool_size < 1) {
50       LOG(ERROR) << "--observer_manager_pool_size should be >= 1";
51       FLAGS_observer_manager_pool_size = 1;
52     }
53     for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
54       threads_.emplace_back([this, i]() {
55         folly::setThreadName(
56             folly::sformat("{}{}", kObserverManagerThreadNamePrefix, i));
57         ObserverManager::inManagerThread_ = true;
58
59         while (true) {
60           Function<void()> task;
61           queue_.blockingRead(task);
62
63           if (!task) {
64             return;
65           }
66
67           try {
68             task();
69           } catch (...) {
70             LOG(ERROR) << "Exception while running CurrentQueue task: "
71                        << exceptionStr(std::current_exception());
72           }
73         }
74       });
75     }
76   }
77
78   ~CurrentQueue() {
79     for (size_t i = 0; i < threads_.size(); ++i) {
80       queue_.blockingWrite(nullptr);
81     }
82
83     for (auto& thread : threads_) {
84       thread.join();
85     }
86
87     CHECK(queue_.isEmpty());
88   }
89
90   void add(Function<void()> task) {
91     if (ObserverManager::inManagerThread()) {
92       if (!queue_.write(std::move(task))) {
93         throw std::runtime_error("Too many Observers scheduled for update.");
94       }
95     } else {
96       queue_.blockingWrite(std::move(task));
97     }
98   }
99
100  private:
101   MPMCQueue<Function<void()>> queue_;
102   std::vector<std::thread> threads_;
103 };
104
105 class ObserverManager::NextQueue {
106  public:
107   explicit NextQueue(ObserverManager& manager)
108       : manager_(manager), queue_(kNextQueueSize) {
109     thread_ = std::thread([&]() {
110       Core::Ptr queueCore;
111
112       while (true) {
113         queue_.blockingRead(queueCore);
114
115         if (!queueCore) {
116           return;
117         }
118
119         std::vector<Core::Ptr> cores;
120         cores.emplace_back(std::move(queueCore));
121
122         {
123           SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
124
125           // We can't pick more tasks from the queue after we bumped the
126           // version, so we have to do this while holding the lock.
127           while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
128             if (!queueCore) {
129               return;
130             }
131             cores.emplace_back(std::move(queueCore));
132           }
133
134           ++manager_.version_;
135         }
136
137         for (auto& core : cores) {
138           manager_.scheduleRefresh(std::move(core), manager_.version_, true);
139         }
140       }
141     });
142   }
143
144   void add(Core::Ptr core) {
145     queue_.blockingWrite(std::move(core));
146   }
147
148   ~NextQueue() {
149     // Emtpy element signals thread to terminate
150     queue_.blockingWrite(nullptr);
151     thread_.join();
152   }
153
154  private:
155   ObserverManager& manager_;
156   MPMCQueue<Core::Ptr> queue_;
157   std::thread thread_;
158 };
159
160 ObserverManager::ObserverManager() {
161   currentQueue_ = make_unique<CurrentQueue>();
162   nextQueue_ = make_unique<NextQueue>(*this);
163 }
164
165 ObserverManager::~ObserverManager() {
166   // Destroy NextQueue, before the rest of this object, since it expects
167   // ObserverManager to be alive.
168   nextQueue_.reset();
169   currentQueue_.reset();
170 }
171
172 void ObserverManager::scheduleCurrent(Function<void()> task) {
173   currentQueue_->add(std::move(task));
174 }
175
176 void ObserverManager::scheduleNext(Core::Ptr core) {
177   nextQueue_->add(std::move(core));
178 }
179
180 struct ObserverManager::Singleton {
181   static folly::Singleton<ObserverManager> instance;
182   // MSVC 2015 doesn't let us access ObserverManager's constructor if we
183   // try to use a lambda to initialize instance, so we have to create
184   // an actual function instead.
185   static ObserverManager* createManager() {
186     return new ObserverManager();
187   }
188 };
189
190 folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
191     createManager);
192
193 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
194   return Singleton::instance.try_get();
195 }
196 }
197 }