#include <folly/experimental/observer/detail/ObserverManager.h>
#include <folly/ExceptionString.h>
+#include <folly/Format.h>
#include <folly/MPMCQueue.h>
+#include <folly/Range.h>
#include <folly/Singleton.h>
+#include <folly/ThreadName.h>
#include <folly/portability/GFlags.h>
namespace folly {
4,
"How many internal threads ObserverManager should use");
+static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
+
namespace {
constexpr size_t kCurrentQueueSize{10 * 1024};
constexpr size_t kNextQueueSize{10 * 1024};
FLAGS_observer_manager_pool_size = 1;
}
for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
- threads_.emplace_back([&]() {
+ threads_.emplace_back([this, i]() {
+ folly::setThreadName(
+ folly::sformat("{}{}", kObserverManagerThreadNamePrefix, i));
ObserverManager::inManagerThread_ = true;
while (true) {
explicit NextQueue(ObserverManager& manager)
: manager_(manager), queue_(kNextQueueSize) {
thread_ = std::thread([&]() {
- Core::Ptr queueCore;
+ Core::WeakPtr queueCoreWeak;
while (true) {
- queue_.blockingRead(queueCore);
-
- if (!queueCore) {
+ queue_.blockingRead(queueCoreWeak);
+ if (stop_) {
return;
}
std::vector<Core::Ptr> cores;
- cores.emplace_back(std::move(queueCore));
+ {
+ auto queueCore = queueCoreWeak.lock();
+ if (!queueCore) {
+ continue;
+ }
+ cores.emplace_back(std::move(queueCore));
+ }
{
SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
// We can't pick more tasks from the queue after we bumped the
// version, so we have to do this while holding the lock.
- while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
- if (!queueCore) {
+ while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) {
+ if (stop_) {
return;
}
- cores.emplace_back(std::move(queueCore));
+ if (auto queueCore = queueCoreWeak.lock()) {
+ cores.emplace_back(std::move(queueCore));
+ }
}
++manager_.version_;
});
}
- void add(Core::Ptr core) {
+ void add(Core::WeakPtr core) {
queue_.blockingWrite(std::move(core));
}
~NextQueue() {
- // Emtpy element signals thread to terminate
- queue_.blockingWrite(nullptr);
+ stop_ = true;
+ // Write to the queue to notify the thread.
+ queue_.blockingWrite(Core::WeakPtr());
thread_.join();
}
private:
ObserverManager& manager_;
- MPMCQueue<Core::Ptr> queue_;
+ MPMCQueue<Core::WeakPtr> queue_;
std::thread thread_;
+ std::atomic<bool> stop_{false};
};
ObserverManager::ObserverManager() {
currentQueue_->add(std::move(task));
}
-void ObserverManager::scheduleNext(Core::Ptr core) {
+void ObserverManager::scheduleNext(Core::WeakPtr core) {
nextQueue_->add(std::move(core));
}