experimental/observer/detail/ObserverManager.cpp missing gflags
[folly.git] / folly / experimental / observer / detail / ObserverManager.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/experimental/observer/detail/ObserverManager.h>
17
18 #include <folly/MPMCQueue.h>
19 #include <folly/Singleton.h>
20 #include <folly/portability/GFlags.h>
21
22 namespace folly {
23 namespace observer_detail {
24
25 FOLLY_TLS bool ObserverManager::inManagerThread_{false};
26 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
27     ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
28
29 DEFINE_int32(
30     observer_manager_pool_size,
31     4,
32     "How many internal threads ObserverManager should use");
33
34 namespace {
35 constexpr size_t kCurrentQueueSize{10 * 1024};
36 constexpr size_t kNextQueueSize{10 * 1024};
37 }
38
39 class ObserverManager::CurrentQueue {
40  public:
41   CurrentQueue() : queue_(kCurrentQueueSize) {
42     if (FLAGS_observer_manager_pool_size < 1) {
43       LOG(ERROR) << "--observer_manager_pool_size should be >= 1";
44       FLAGS_observer_manager_pool_size = 1;
45     }
46     for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
47       threads_.emplace_back([&]() {
48         ObserverManager::inManagerThread_ = true;
49
50         while (true) {
51           Function<void()> task;
52           queue_.blockingRead(task);
53
54           if (!task) {
55             return;
56           }
57
58           try {
59             task();
60           } catch (...) {
61             LOG(ERROR) << "Exception while running CurrentQueue task: "
62                        << exceptionStr(std::current_exception());
63           }
64         }
65       });
66     }
67   }
68
69   ~CurrentQueue() {
70     for (size_t i = 0; i < threads_.size(); ++i) {
71       queue_.blockingWrite(nullptr);
72     }
73
74     for (auto& thread : threads_) {
75       thread.join();
76     }
77
78     CHECK(queue_.isEmpty());
79   }
80
81   void add(Function<void()> task) {
82     if (ObserverManager::inManagerThread()) {
83       if (!queue_.write(std::move(task))) {
84         throw std::runtime_error("Too many Observers scheduled for update.");
85       }
86     } else {
87       queue_.blockingWrite(std::move(task));
88     }
89   }
90
91  private:
92   MPMCQueue<Function<void()>> queue_;
93   std::vector<std::thread> threads_;
94 };
95
96 class ObserverManager::NextQueue {
97  public:
98   explicit NextQueue(ObserverManager& manager)
99       : manager_(manager), queue_(kNextQueueSize) {
100     thread_ = std::thread([&]() {
101       Core::Ptr queueCore;
102
103       while (true) {
104         queue_.blockingRead(queueCore);
105
106         if (!queueCore) {
107           return;
108         }
109
110         std::vector<Core::Ptr> cores;
111         cores.emplace_back(std::move(queueCore));
112
113         {
114           SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
115
116           // We can't pick more tasks from the queue after we bumped the
117           // version, so we have to do this while holding the lock.
118           while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
119             if (!queueCore) {
120               return;
121             }
122             cores.emplace_back(std::move(queueCore));
123           }
124
125           ++manager_.version_;
126         }
127
128         for (auto& core : cores) {
129           manager_.scheduleRefresh(std::move(core), manager_.version_, true);
130         }
131       }
132     });
133   }
134
135   void add(Core::Ptr core) {
136     queue_.blockingWrite(std::move(core));
137   }
138
139   ~NextQueue() {
140     // Emtpy element signals thread to terminate
141     queue_.blockingWrite(nullptr);
142     thread_.join();
143   }
144
145  private:
146   ObserverManager& manager_;
147   MPMCQueue<Core::Ptr> queue_;
148   std::thread thread_;
149 };
150
151 ObserverManager::ObserverManager() {
152   currentQueue_ = make_unique<CurrentQueue>();
153   nextQueue_ = make_unique<NextQueue>(*this);
154 }
155
156 ObserverManager::~ObserverManager() {
157   // Destroy NextQueue, before the rest of this object, since it expects
158   // ObserverManager to be alive.
159   nextQueue_.reset();
160   currentQueue_.reset();
161 }
162
163 void ObserverManager::scheduleCurrent(Function<void()> task) {
164   currentQueue_->add(std::move(task));
165 }
166
167 void ObserverManager::scheduleNext(Core::Ptr core) {
168   nextQueue_->add(std::move(core));
169 }
170
171 struct ObserverManager::Singleton {
172   static folly::Singleton<ObserverManager> instance;
173   // MSVC 2015 doesn't let us access ObserverManager's constructor if we
174   // try to use a lambda to initialize instance, so we have to create
175   // an actual function instead.
176   static ObserverManager* createManager() {
177     return new ObserverManager();
178   }
179 };
180
181 folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
182     createManager);
183
184 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
185   return Singleton::instance.try_get();
186 }
187 }
188 }