X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;ds=sidebyside;f=folly%2Fexperimental%2Fobserver%2Fdetail%2FObserverManager.cpp;h=f909ef57f3da31a9d88df714e4157e1b10ba51d7;hb=2642bd3dcf9658be7da3e5b5bc622fe051200e97;hp=9f8b7b7f1afa83fcdacf0f3e0cca41464667941f;hpb=74b5413059e7be72dbc52ba815160bd6fe825835;p=folly.git diff --git a/folly/experimental/observer/detail/ObserverManager.cpp b/folly/experimental/observer/detail/ObserverManager.cpp index 9f8b7b7f..f909ef57 100644 --- a/folly/experimental/observer/detail/ObserverManager.cpp +++ b/folly/experimental/observer/detail/ObserverManager.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,8 +15,13 @@ */ #include +#include +#include #include +#include #include +#include +#include namespace folly { namespace observer_detail { @@ -25,8 +30,14 @@ FOLLY_TLS bool ObserverManager::inManagerThread_{false}; FOLLY_TLS ObserverManager::DependencyRecorder::Dependencies* ObserverManager::DependencyRecorder::currentDependencies_{nullptr}; +DEFINE_int32( + observer_manager_pool_size, + 4, + "How many internal threads ObserverManager should use"); + +static constexpr StringPiece kObserverManagerThreadNamePrefix{"ObserverMngr"}; + namespace { -constexpr size_t kCurrentThreadPoolSize{4}; constexpr size_t kCurrentQueueSize{10 * 1024}; constexpr size_t kNextQueueSize{10 * 1024}; } @@ -34,8 +45,14 @@ constexpr size_t kNextQueueSize{10 * 1024}; class ObserverManager::CurrentQueue { public: CurrentQueue() : queue_(kCurrentQueueSize) { - for (size_t i = 0; i < kCurrentThreadPoolSize; ++i) { - threads_.emplace_back([&]() { + if (FLAGS_observer_manager_pool_size < 1) { + LOG(ERROR) << "--observer_manager_pool_size should be >= 1"; + FLAGS_observer_manager_pool_size = 1; + } + for (int32_t i = 0; i < FLAGS_observer_manager_pool_size; ++i) { + threads_.emplace_back([this, i]() { + folly::setThreadName( + folly::sformat("{}{}", kObserverManagerThreadNamePrefix, i)); ObserverManager::inManagerThread_ = true; while (true) { @@ -89,28 +106,35 @@ class ObserverManager::NextQueue { explicit NextQueue(ObserverManager& manager) : manager_(manager), queue_(kNextQueueSize) { thread_ = std::thread([&]() { - Core::Ptr queueCore; + Core::WeakPtr queueCoreWeak; while (true) { - queue_.blockingRead(queueCore); - - if (!queueCore) { + queue_.blockingRead(queueCoreWeak); + if (stop_) { return; } std::vector cores; - cores.emplace_back(std::move(queueCore)); + { + auto queueCore = queueCoreWeak.lock(); + if (!queueCore) { + continue; + } + cores.emplace_back(std::move(queueCore)); + } { SharedMutexReadPriority::WriteHolder wh(manager_.versionMutex_); // We can't pick more tasks from the queue after we bumped the // version, so we have to do this while holding the lock. - while (cores.size() < kNextQueueSize && queue_.read(queueCore)) { - if (!queueCore) { + while (cores.size() < kNextQueueSize && queue_.read(queueCoreWeak)) { + if (stop_) { return; } - cores.emplace_back(std::move(queueCore)); + if (auto queueCore = queueCoreWeak.lock()) { + cores.emplace_back(std::move(queueCore)); + } } ++manager_.version_; @@ -123,20 +147,22 @@ class ObserverManager::NextQueue { }); } - void add(Core::Ptr core) { + void add(Core::WeakPtr core) { queue_.blockingWrite(std::move(core)); } ~NextQueue() { - // Emtpy element signals thread to terminate - queue_.blockingWrite(nullptr); + stop_ = true; + // Write to the queue to notify the thread. + queue_.blockingWrite(Core::WeakPtr()); thread_.join(); } private: ObserverManager& manager_; - MPMCQueue queue_; + MPMCQueue queue_; std::thread thread_; + std::atomic stop_{false}; }; ObserverManager::ObserverManager() { @@ -155,7 +181,7 @@ void ObserverManager::scheduleCurrent(Function task) { currentQueue_->add(std::move(task)); } -void ObserverManager::scheduleNext(Core::Ptr core) { +void ObserverManager::scheduleNext(Core::WeakPtr core) { nextQueue_->add(std::move(core)); }