/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2016-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <folly/experimental/observer/detail/ObserverManager.h>
#include <folly/ExceptionString.h>
+#include <folly/Format.h>
#include <folly/MPMCQueue.h>
+#include <folly/Range.h>
#include <folly/Singleton.h>
#include <folly/portability/GFlags.h>
+#include <folly/system/ThreadName.h>
namespace folly {
namespace observer_detail {
4,
"How many internal threads ObserverManager should use");
+static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"};
+
namespace {
constexpr size_t kCurrentQueueSize{10 * 1024};
constexpr size_t kNextQueueSize{10 * 1024};
-}
+} // namespace
class ObserverManager::CurrentQueue {
public:
FLAGS_observer_manager_pool_size = 1;
}
for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) {
- threads_.emplace_back([&]() {
+ threads_.emplace_back([this, i]() {
+ folly::setThreadName(
+ folly::sformat("{}{}", kObserverManagerThreadNamePrefix, i));
ObserverManager::inManagerThread_ = true;
while (true) {
explicit NextQueue(ObserverManager& manager)
: manager_(manager), queue_(kNextQueueSize) {
thread_ = std::thread([&]() {
- Core::Ptr queueCore;
+ Core::WeakPtr queueCoreWeak;
while (true) {
- queue_.blockingRead(queueCore);
-
- if (!queueCore) {
+ queue_.blockingRead(queueCoreWeak);
+ if (stop_) {
return;
}
std::vector<Core::Ptr> cores;
- cores.emplace_back(std::move(queueCore));
+ {
+ auto queueCore = queueCoreWeak.lock();
+ if (!queueCore) {
+ continue;
+ }
+ cores.emplace_back(std::move(queueCore));
+ }
{
SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_);
// We can't pick more tasks from the queue after we bumped the
// version, so we have to do this while holding the lock.
- while (cores.size() < kNextQueueSize && queue_.read(queueCore)) {
- if (!queueCore) {
+ while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) {
+ if (stop_) {
return;
}
- cores.emplace_back(std::move(queueCore));
+ if (auto queueCore = queueCoreWeak.lock()) {
+ cores.emplace_back(std::move(queueCore));
+ }
}
++manager_.version_;
});
}
- void add(Core::Ptr core) {
+ void add(Core::WeakPtr core) {
queue_.blockingWrite(std::move(core));
}
~NextQueue() {
- // Emtpy element signals thread to terminate
- queue_.blockingWrite(nullptr);
+ stop_ = true;
+ // Write to the queue to notify the thread.
+ queue_.blockingWrite(Core::WeakPtr());
thread_.join();
}
private:
ObserverManager& manager_;
- MPMCQueue<Core::Ptr> queue_;
+ MPMCQueue<Core::WeakPtr> queue_;
std::thread thread_;
+ std::atomic<bool> stop_{false};
};
ObserverManager::ObserverManager() {
- currentQueue_ = make_unique<CurrentQueue>();
- nextQueue_ = make_unique<NextQueue>(*this);
+ currentQueue_ = std::make_unique<CurrentQueue>();
+ nextQueue_ = std::make_unique<NextQueue>(*this);
}
ObserverManager::~ObserverManager() {
currentQueue_->add(std::move(task));
}
-void ObserverManager::scheduleNext(Core::Ptr core) {
+void ObserverManager::scheduleNext(Core::WeakPtr core) {
nextQueue_->add(std::move(core));
}
std::shared_ptr<ObserverManager> ObserverManager::getInstance() {
return Singleton::instance.try_get();
}
-}
-}
+} // namespace observer_detail
+} // namespace folly