7654dff5e93399f665ada289af2a0de622caef1b
[folly.git] / folly / experimental / observer / detail / ObserverManager.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/experimental/observer/detail/ObserverManager.h>
17
18 #include <folly/ExceptionString.h>
19 #include <folly/Format.h>
20 #include <folly/MPMCQueue.h>
21 #include <folly/Range.h>
22 #include <folly/Singleton.h>
23 #include <folly/ThreadName.h>
24 #include <folly/portability/GFlags.h>
25
26 namespace folly {
27 namespace observer_detail {
28
29 FOLLY_TLS bool ObserverManager::inManagerThread_{false};
30 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
31     ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
32
33 DEFINE_int32(
34     observer_manager_pool_size,
35     4,
36     "How many internal threads ObserverManager should use");
37
38 static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
39
40 namespace {
41 constexpr size_t kCurrentQueueSize{10 * 1024};
42 constexpr size_t kNextQueueSize{10 * 1024};
43 }
44
45 class ObserverManager::CurrentQueue {
46  public:
47   CurrentQueue() : queue_(kCurrentQueueSize) {
48     if (FLAGS_observer_manager_pool_size < 1) {
49       LOG(ERROR) << "--observer_manager_pool_size should be >= 1";
50       FLAGS_observer_manager_pool_size = 1;
51     }
52     for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
53       threads_.emplace_back([this, i]() {
54         folly::setThreadName(
55             folly::sformat("{}{}", kObserverManagerThreadNamePrefix, i));
56         ObserverManager::inManagerThread_ = true;
57
58         while (true) {
59           Function<void()> task;
60           queue_.blockingRead(task);
61
62           if (!task) {
63             return;
64           }
65
66           try {
67             task();
68           } catch (...) {
69             LOG(ERROR) << "Exception while running CurrentQueue task: "
70                        << exceptionStr(std::current_exception());
71           }
72         }
73       });
74     }
75   }
76
77   ~CurrentQueue() {
78     for (size_t i = 0; i < threads_.size(); ++i) {
79       queue_.blockingWrite(nullptr);
80     }
81
82     for (auto& thread : threads_) {
83       thread.join();
84     }
85
86     CHECK(queue_.isEmpty());
87   }
88
89   void add(Function<void()> task) {
90     if (ObserverManager::inManagerThread()) {
91       if (!queue_.write(std::move(task))) {
92         throw std::runtime_error("Too many Observers scheduled for update.");
93       }
94     } else {
95       queue_.blockingWrite(std::move(task));
96     }
97   }
98
99  private:
100   MPMCQueue<Function<void()>> queue_;
101   std::vector<std::thread> threads_;
102 };
103
104 class ObserverManager::NextQueue {
105  public:
106   explicit NextQueue(ObserverManager& manager)
107       : manager_(manager), queue_(kNextQueueSize) {
108     thread_ = std::thread([&]() {
109       Core::Ptr queueCore;
110
111       while (true) {
112         queue_.blockingRead(queueCore);
113
114         if (!queueCore) {
115           return;
116         }
117
118         std::vector<Core::Ptr> cores;
119         cores.emplace_back(std::move(queueCore));
120
121         {
122           SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
123
124           // We can't pick more tasks from the queue after we bumped the
125           // version, so we have to do this while holding the lock.
126           while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
127             if (!queueCore) {
128               return;
129             }
130             cores.emplace_back(std::move(queueCore));
131           }
132
133           ++manager_.version_;
134         }
135
136         for (auto& core : cores) {
137           manager_.scheduleRefresh(std::move(core), manager_.version_, true);
138         }
139       }
140     });
141   }
142
143   void add(Core::Ptr core) {
144     queue_.blockingWrite(std::move(core));
145   }
146
147   ~NextQueue() {
148     // Emtpy element signals thread to terminate
149     queue_.blockingWrite(nullptr);
150     thread_.join();
151   }
152
153  private:
154   ObserverManager& manager_;
155   MPMCQueue<Core::Ptr> queue_;
156   std::thread thread_;
157 };
158
159 ObserverManager::ObserverManager() {
160   currentQueue_ = make_unique<CurrentQueue>();
161   nextQueue_ = make_unique<NextQueue>(*this);
162 }
163
164 ObserverManager::~ObserverManager() {
165   // Destroy NextQueue, before the rest of this object, since it expects
166   // ObserverManager to be alive.
167   nextQueue_.reset();
168   currentQueue_.reset();
169 }
170
171 void ObserverManager::scheduleCurrent(Function<void()> task) {
172   currentQueue_->add(std::move(task));
173 }
174
175 void ObserverManager::scheduleNext(Core::Ptr core) {
176   nextQueue_->add(std::move(core));
177 }
178
179 struct ObserverManager::Singleton {
180   static folly::Singleton<ObserverManager> instance;
181   // MSVC 2015 doesn't let us access ObserverManager's constructor if we
182   // try to use a lambda to initialize instance, so we have to create
183   // an actual function instead.
184   static ObserverManager* createManager() {
185     return new ObserverManager();
186   }
187 };
188
189 folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
190     createManager);
191
192 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
193   return Singleton::instance.try_get();
194 }
195 }
196 }