From 9009c2b491bdc8bcfe8eab37d688063242006c83 Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Tue, 6 Jan 2015 10:36:31 -0800 Subject: [PATCH] Thread Observer Summary: Observer methods, so users of IOThreadPoolExecutor can do stuff when threads are added/removed. As a use case, previously the thrift server only used the threads already started when it started up, and assumed iothreadpool was never resized. Test Plan: Added several unittests Reviewed By: jsedgwick@fb.com Subscribers: trunkagent, doug, fugalh, alandau, bmatheny, mshneer, folly-diffs@ FB internal diff: D1753861 Signature: t1:1753861:1420236825:54cbdfee0efb3b97dea35faba29c134f2b10a480 --- .../concurrent/CPUThreadPoolExecutor.cpp | 4 ++ .../concurrent/IOThreadPoolExecutor.cpp | 24 +++++---- .../wangle/concurrent/IOThreadPoolExecutor.h | 2 +- .../wangle/concurrent/ThreadPoolExecutor.cpp | 28 ++++++++++ folly/wangle/concurrent/ThreadPoolExecutor.h | 30 ++++++++++- .../test/ThreadPoolExecutorTest.cpp | 53 +++++++++++++++++++ 6 files changed, 129 insertions(+), 12 deletions(-) diff --git a/folly/wangle/concurrent/CPUThreadPoolExecutor.cpp b/folly/wangle/concurrent/CPUThreadPoolExecutor.cpp index a03c6151..e0ad08c8 100644 --- a/folly/wangle/concurrent/CPUThreadPoolExecutor.cpp +++ b/folly/wangle/concurrent/CPUThreadPoolExecutor.cpp @@ -105,6 +105,10 @@ void CPUThreadPoolExecutor::threadRun(std::shared_ptr thread) { auto task = taskQueue_->take(); if (UNLIKELY(task.poison)) { CHECK(threadsToStop_-- > 0); + for (auto& o : observers_) { + o->threadStopped(thread.get()); + } + stoppedThreads_.add(thread); return; } else { diff --git a/folly/wangle/concurrent/IOThreadPoolExecutor.cpp b/folly/wangle/concurrent/IOThreadPoolExecutor.cpp index 5c97bf44..721083a1 100644 --- a/folly/wangle/concurrent/IOThreadPoolExecutor.cpp +++ b/folly/wangle/concurrent/IOThreadPoolExecutor.cpp @@ -117,6 +117,17 @@ EventBase* IOThreadPoolExecutor::getEventBase() { return pickThread()->eventBase; } +EventBase* IOThreadPoolExecutor::getEventBase( + ThreadPoolExecutor::ThreadHandle* h) { + auto thread = dynamic_cast(h); + + if (thread) { + return thread->eventBase; + } + + return nullptr; +} + std::shared_ptr IOThreadPoolExecutor::makeThread() { return std::make_shared(this); @@ -148,21 +159,14 @@ void IOThreadPoolExecutor::stopThreads(size_t n) { for (size_t i = 0; i < n; i++) { const auto ioThread = std::static_pointer_cast( threadList_.get()[i]); + for (auto& o : observers_) { + o->threadStopped(ioThread.get()); + } ioThread->shouldRun = false; ioThread->eventBase->terminateLoopSoon(); } } -std::vector IOThreadPoolExecutor::getEventBases() { - std::vector bases; - RWSpinLock::ReadHolder{&threadListLock_}; - for (const auto& thread : threadList_.get()) { - auto ioThread = std::static_pointer_cast(thread); - bases.push_back(ioThread->eventBase); - } - return bases; -} - // threadListLock_ is readlocked uint64_t IOThreadPoolExecutor::getPendingTaskCount() { uint64_t count = 0; diff --git a/folly/wangle/concurrent/IOThreadPoolExecutor.h b/folly/wangle/concurrent/IOThreadPoolExecutor.h index 7c919d1e..1e4e8b07 100644 --- a/folly/wangle/concurrent/IOThreadPoolExecutor.h +++ b/folly/wangle/concurrent/IOThreadPoolExecutor.h @@ -41,7 +41,7 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor { EventBase* getEventBase() override; - std::vector getEventBases(); + EventBase* getEventBase(ThreadPoolExecutor::ThreadHandle*); private: struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread { diff --git a/folly/wangle/concurrent/ThreadPoolExecutor.cpp b/folly/wangle/concurrent/ThreadPoolExecutor.cpp index 40694754..25660db1 100644 --- a/folly/wangle/concurrent/ThreadPoolExecutor.cpp +++ b/folly/wangle/concurrent/ThreadPoolExecutor.cpp @@ -99,6 +99,11 @@ void ThreadPoolExecutor::addThreads(size_t n) { for (auto& thread : newThreads) { thread->startupBaton.wait(); } + for (auto& o : observers_) { + for (auto& thread : newThreads) { + o->threadStarted(thread.get()); + } + } } // threadListLock_ is writelocked @@ -171,4 +176,27 @@ size_t ThreadPoolExecutor::StoppedThreadQueue::size() { return queue_.size(); } +void ThreadPoolExecutor::addObserver(std::shared_ptr o) { + RWSpinLock::ReadHolder{&threadListLock_}; + observers_.push_back(o); + for (auto& thread : threadList_.get()) { + o->threadPreviouslyStarted(thread.get()); + } +} + +void ThreadPoolExecutor::removeObserver(std::shared_ptr o) { + RWSpinLock::ReadHolder{&threadListLock_}; + for (auto& thread : threadList_.get()) { + o->threadNotYetStopped(thread.get()); + } + + for (auto it = observers_.begin(); it != observers_.end(); it++) { + if (*it == o) { + observers_.erase(it); + return; + } + } + DCHECK(false); +} + }} // folly::wangle diff --git a/folly/wangle/concurrent/ThreadPoolExecutor.h b/folly/wangle/concurrent/ThreadPoolExecutor.h index be8f7968..f978a5e3 100644 --- a/folly/wangle/concurrent/ThreadPoolExecutor.h +++ b/folly/wangle/concurrent/ThreadPoolExecutor.h @@ -85,13 +85,40 @@ class ThreadPoolExecutor : public virtual Executor { return taskStatsSubject_->subscribe(observer); } + /** + * Base class for threads created with ThreadPoolExecutor. + * Some subclasses have methods that operate on these + * handles. + */ + class ThreadHandle { + public: + virtual ~ThreadHandle() = default; + }; + + /** + * Observer interface for thread start/stop. + * Provides hooks so actions can be taken when + * threads are created + */ + class Observer { + public: + virtual void threadStarted(ThreadHandle*) = 0; + virtual void threadStopped(ThreadHandle*) = 0; + virtual void threadPreviouslyStarted(ThreadHandle*) = 0; + virtual void threadNotYetStopped(ThreadHandle*) = 0; + virtual ~Observer() = default; + }; + + void addObserver(std::shared_ptr); + void removeObserver(std::shared_ptr); + protected: // Prerequisite: threadListLock_ writelocked void addThreads(size_t n); // Prerequisite: threadListLock_ writelocked void removeThreads(size_t n, bool isJoin); - struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread { + struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread : public ThreadHandle { explicit Thread(ThreadPoolExecutor* pool) : id(nextId++), handle(), @@ -185,6 +212,7 @@ class ThreadPoolExecutor : public virtual Executor { std::atomic isJoin_; // whether the current downsizing is a join std::shared_ptr> taskStatsSubject_; + std::vector> observers_; }; }} // folly::wangle diff --git a/folly/wangle/concurrent/test/ThreadPoolExecutorTest.cpp b/folly/wangle/concurrent/test/ThreadPoolExecutorTest.cpp index 596e2784..385d2b0e 100644 --- a/folly/wangle/concurrent/test/ThreadPoolExecutorTest.cpp +++ b/folly/wangle/concurrent/test/ThreadPoolExecutorTest.cpp @@ -318,3 +318,56 @@ TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) { pool.join(); EXPECT_EQ(100, completed); } + +class TestObserver : public ThreadPoolExecutor::Observer { + public: + void threadStarted(ThreadPoolExecutor::ThreadHandle*) { + threads_++; + } + void threadStopped(ThreadPoolExecutor::ThreadHandle*) { + threads_--; + } + void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) { + threads_++; + } + void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) { + threads_--; + } + void checkCalls() { + ASSERT_EQ(threads_, 0); + } + private: + int threads_{0}; +}; + +TEST(ThreadPoolExecutorTest, IOObserver) { + auto observer = std::make_shared(); + + { + IOThreadPoolExecutor exe(10); + exe.addObserver(observer); + exe.setNumThreads(3); + exe.setNumThreads(0); + exe.setNumThreads(7); + exe.removeObserver(observer); + exe.setNumThreads(10); + } + + observer->checkCalls(); +} + +TEST(ThreadPoolExecutorTest, CPUObserver) { + auto observer = std::make_shared(); + + { + CPUThreadPoolExecutor exe(10); + exe.addObserver(observer); + exe.setNumThreads(3); + exe.setNumThreads(0); + exe.setNumThreads(7); + exe.removeObserver(observer); + exe.setNumThreads(10); + } + + observer->checkCalls(); +} -- 2.34.1