Reduce footprint of ScribeClient
[folly.git] / folly / experimental / observer / detail / ObserverManager.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/experimental/observer/detail/ObserverManager.h>
17
18 #include <folly/MPMCQueue.h>
19 #include <folly/Singleton.h>
20
21 namespace folly {
22 namespace observer_detail {
23
24 FOLLY_TLS bool ObserverManager::inManagerThread_{false};
25 FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies*
26     ObserverManager::DependencyRecorder::currentDependencies_{nullptr};
27
28 DEFINE_int32(
29     observer_manager_pool_size,
30     4,
31     "How many internal threads ObserverManager should use");
32
33 namespace {
34 constexpr size_t kCurrentQueueSize{10 * 1024};
35 constexpr size_t kNextQueueSize{10 * 1024};
36 }
37
38 class ObserverManager::CurrentQueue {
39  public:
40   CurrentQueue() : queue_(kCurrentQueueSize) {
41     if (FLAGS_observer_manager_pool_size < 1) {
42       LOG(ERROR) << "--observer_manager_pool_size should be >= 1";
43       FLAGS_observer_manager_pool_size = 1;
44     }
45     for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
46       threads_.emplace_back([&]() {
47         ObserverManager::inManagerThread_ = true;
48
49         while (true) {
50           Function<void()> task;
51           queue_.blockingRead(task);
52
53           if (!task) {
54             return;
55           }
56
57           try {
58             task();
59           } catch (...) {
60             LOG(ERROR) << "Exception while running CurrentQueue task: "
61                        << exceptionStr(std::current_exception());
62           }
63         }
64       });
65     }
66   }
67
68   ~CurrentQueue() {
69     for (size_t i = 0; i < threads_.size(); ++i) {
70       queue_.blockingWrite(nullptr);
71     }
72
73     for (auto& thread : threads_) {
74       thread.join();
75     }
76
77     CHECK(queue_.isEmpty());
78   }
79
80   void add(Function<void()> task) {
81     if (ObserverManager::inManagerThread()) {
82       if (!queue_.write(std::move(task))) {
83         throw std::runtime_error("Too many Observers scheduled for update.");
84       }
85     } else {
86       queue_.blockingWrite(std::move(task));
87     }
88   }
89
90  private:
91   MPMCQueue<Function<void()>> queue_;
92   std::vector<std::thread> threads_;
93 };
94
95 class ObserverManager::NextQueue {
96  public:
97   explicit NextQueue(ObserverManager& manager)
98       : manager_(manager), queue_(kNextQueueSize) {
99     thread_ = std::thread([&]() {
100       Core::Ptr queueCore;
101
102       while (true) {
103         queue_.blockingRead(queueCore);
104
105         if (!queueCore) {
106           return;
107         }
108
109         std::vector<Core::Ptr> cores;
110         cores.emplace_back(std::move(queueCore));
111
112         {
113           SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
114
115           // We can't pick more tasks from the queue after we bumped the
116           // version, so we have to do this while holding the lock.
117           while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
118             if (!queueCore) {
119               return;
120             }
121             cores.emplace_back(std::move(queueCore));
122           }
123
124           ++manager_.version_;
125         }
126
127         for (auto& core : cores) {
128           manager_.scheduleRefresh(std::move(core), manager_.version_, true);
129         }
130       }
131     });
132   }
133
134   void add(Core::Ptr core) {
135     queue_.blockingWrite(std::move(core));
136   }
137
138   ~NextQueue() {
139     // Emtpy element signals thread to terminate
140     queue_.blockingWrite(nullptr);
141     thread_.join();
142   }
143
144  private:
145   ObserverManager& manager_;
146   MPMCQueue<Core::Ptr> queue_;
147   std::thread thread_;
148 };
149
150 ObserverManager::ObserverManager() {
151   currentQueue_ = make_unique<CurrentQueue>();
152   nextQueue_ = make_unique<NextQueue>(*this);
153 }
154
155 ObserverManager::~ObserverManager() {
156   // Destroy NextQueue, before the rest of this object, since it expects
157   // ObserverManager to be alive.
158   nextQueue_.reset();
159   currentQueue_.reset();
160 }
161
162 void ObserverManager::scheduleCurrent(Function<void()> task) {
163   currentQueue_->add(std::move(task));
164 }
165
166 void ObserverManager::scheduleNext(Core::Ptr core) {
167   nextQueue_->add(std::move(core));
168 }
169
170 struct ObserverManager::Singleton {
171   static folly::Singleton<ObserverManager> instance;
172   // MSVC 2015 doesn't let us access ObserverManager's constructor if we
173   // try to use a lambda to initialize instance, so we have to create
174   // an actual function instead.
175   static ObserverManager* createManager() {
176     return new ObserverManager();
177   }
178 };
179
180 folly::Singleton<ObserverManager> ObserverManager::Singleton::instance(
181     createManager);
182
183 std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
184   return Singleton::instance.try_get();
185 }
186 }
187 }