From 53367378d6e5455dfbc649b325cfa583c54227ac Mon Sep 17 00:00:00 2001 From: Andrii Grynenko Date: Thu, 9 Feb 2017 10:58:07 -0800 Subject: [PATCH] Add keepAlive() mechanism Summary: EventBase and VirtualEventBase already had a loopKeepAlive() mechanism, which enabled libraries to prevent EventBase/VirtualEventBase from being destroyed until all keep-alive tokens were released. This change adds generic keepAlive() support into folly::Executor. folly::Executors which don't support keep-alive mechanism yet, will just return a no-op KeepAlive token. Reviewed By: yfeldblum Differential Revision: D4516649 fbshipit-source-id: 869779621c746cb14d985aa73bc4536859914c03 --- folly/Executor.cpp | 7 +++ folly/Executor.h | 41 +++++++++++++++ folly/fibers/EventBaseLoopController-inl.h | 2 +- folly/fibers/EventBaseLoopController.h | 2 +- folly/io/async/EventBase.h | 32 ++++-------- folly/io/async/VirtualEventBase.cpp | 4 +- folly/io/async/VirtualEventBase.h | 60 +++++++++------------- folly/io/async/test/EventBaseTest.cpp | 17 +++--- 8 files changed, 96 insertions(+), 69 deletions(-) diff --git a/folly/Executor.cpp b/folly/Executor.cpp index 91f98848..42d1f04a 100644 --- a/folly/Executor.cpp +++ b/folly/Executor.cpp @@ -18,10 +18,17 @@ #include +#include + namespace folly { void Executor::addWithPriority(Func, int8_t /* priority */) { throw std::runtime_error( "addWithPriority() is not implemented for this Executor"); } + +void Executor::keepAliveRelease() { + LOG(FATAL) << "keepAliveRelease() should not be called for folly::Executors " + << "which do not implement getKeepAliveToken()"; +} } diff --git a/folly/Executor.h b/folly/Executor.h index 258100cd..b4f10779 100644 --- a/folly/Executor.h +++ b/folly/Executor.h @@ -55,6 +55,47 @@ class Executor { void addPtr(P fn) { this->add([fn]() mutable { (*fn)(); }); } + + class KeepAlive { + public: + KeepAlive() {} + + void reset() { + executor_.reset(); + } + + explicit operator bool() const { + return executor_ != nullptr; + } + + private: + friend class Executor; + explicit KeepAlive(folly::Executor* executor) : executor_(executor) {} + + struct Deleter { + void operator()(folly::Executor* executor) { + executor->keepAliveRelease(); + } + }; + std::unique_ptr executor_; + }; + + /// Returns a keep-alive token which guarantees that Executor will keep + /// processing tasks until the token is released. keep-alive token can only + /// be destroyed from within the task, scheduled to be run on an executor. + /// + /// If executor does not support keep-alive functionality - dummy token will + /// be returned. + virtual KeepAlive getKeepAliveToken() { + return {}; + } + + protected: + virtual void keepAliveRelease(); + + KeepAlive makeKeepAlive() { + return KeepAlive{this}; + } }; } // folly diff --git a/folly/fibers/EventBaseLoopController-inl.h b/folly/fibers/EventBaseLoopController-inl.h index 09f260c2..025341ce 100644 --- a/folly/fibers/EventBaseLoopController-inl.h +++ b/folly/fibers/EventBaseLoopController-inl.h @@ -72,7 +72,7 @@ inline void EventBaseLoopControllerT::cancel() { template inline void EventBaseLoopControllerT::runLoop() { if (!eventBaseKeepAlive_) { - eventBaseKeepAlive_ = eventBase_->loopKeepAlive(); + eventBaseKeepAlive_ = eventBase_->getKeepAliveToken(); } if (loopRunner_) { loopRunner_->run([&] { fm_->loopUntilNoReadyImpl(); }); diff --git a/folly/fibers/EventBaseLoopController.h b/folly/fibers/EventBaseLoopController.h index bdc6cd72..f389a45e 100644 --- a/folly/fibers/EventBaseLoopController.h +++ b/folly/fibers/EventBaseLoopController.h @@ -93,7 +93,7 @@ class EventBaseLoopControllerT : public LoopController { bool awaitingScheduling_{false}; EventBaseT* eventBase_{nullptr}; - typename EventBaseT::LoopKeepAlive eventBaseKeepAlive_; + Executor::KeepAlive eventBaseKeepAlive_; ControllerCallback callback_; DestructionCallback destructionCallback_; FiberManager* fm_{nullptr}; diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index cc23fed2..c318d609 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -600,33 +600,17 @@ class EventBase : private boost::noncopyable, loopOnce(); } - struct LoopKeepAliveDeleter { - void operator()(EventBase* evb) { - DCHECK(evb->isInEventBaseThread()); - evb->loopKeepAliveCount_--; - } - }; - using LoopKeepAlive = std::unique_ptr; - /// Returns you a handle which make loop() behave like loopForever() until /// destroyed. loop() will return to its original behavior only when all /// loop keep-alives are released. Loop holder is safe to release only from /// EventBase thread. - /// - /// May return no op LoopKeepAlive if loopForever() is already running. - LoopKeepAlive loopKeepAlive() { - DCHECK(isInEventBaseThread()); - loopKeepAliveCount_++; - return LoopKeepAlive(this); - } - - // Thread-safe version of loopKeepAlive() - LoopKeepAlive loopKeepAliveAtomic() { + KeepAlive getKeepAliveToken() override { if (inRunningEventBaseThread()) { - return loopKeepAlive(); + loopKeepAliveCount_++; + } else { + loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed); } - loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed); - return LoopKeepAlive(this); + return makeKeepAlive(); } // TimeoutManager @@ -645,6 +629,12 @@ class EventBase : private boost::noncopyable, return isInEventBaseThread(); } + protected: + void keepAliveRelease() override { + DCHECK(isInEventBaseThread()); + loopKeepAliveCount_--; + } + private: void applyLoopKeepAlive(); diff --git a/folly/io/async/VirtualEventBase.cpp b/folly/io/async/VirtualEventBase.cpp index cccac60c..c1dbe2e7 100644 --- a/folly/io/async/VirtualEventBase.cpp +++ b/folly/io/async/VirtualEventBase.cpp @@ -18,8 +18,8 @@ namespace folly { VirtualEventBase::VirtualEventBase(EventBase& evb) : evb_(evb) { - evbLoopKeepAlive_ = evb_.loopKeepAliveAtomic(); - loopKeepAlive_ = loopKeepAliveAtomic(); + evbLoopKeepAlive_ = evb_.getKeepAliveToken(); + loopKeepAlive_ = getKeepAliveToken(); } VirtualEventBase::~VirtualEventBase() { diff --git a/folly/io/async/VirtualEventBase.h b/folly/io/async/VirtualEventBase.h index f3be2650..f356d50b 100644 --- a/folly/io/async/VirtualEventBase.h +++ b/folly/io/async/VirtualEventBase.h @@ -27,7 +27,7 @@ namespace folly { * * Multiple VirtualEventBases can be backed by a single EventBase. Similarly * to EventBase, VirtualEventBase implements loopKeepAlive() functionality, - * which allows callbacks holding LoopKeepAlive token to keep EventBase looping + * which allows callbacks holding KeepAlive token to keep EventBase looping * until they are complete. * * VirtualEventBase destructor blocks until all its KeepAliveTokens are released @@ -75,11 +75,11 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager { */ template void runInEventBaseThread(F&& f) { - // LoopKeepAlive token has to be released in the EventBase thread. If - // runInEventBaseThread() fails, we can't extract the LoopKeepAlive token + // KeepAlive token has to be released in the EventBase thread. If + // runInEventBaseThread() fails, we can't extract the KeepAlive token // from the callback to properly release it. CHECK(evb_.runInEventBaseThread([ - keepAlive = loopKeepAliveAtomic(), + keepAliveToken = getKeepAliveToken(), f = std::forward(f) ]() mutable { f(); })); } @@ -122,47 +122,35 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager { runInEventBaseThread(std::move(f)); } - struct LoopKeepAliveDeleter { - void operator()(VirtualEventBase* evb) { - DCHECK(evb->getEventBase().inRunningEventBaseThread()); - if (evb->loopKeepAliveCountAtomic_.load()) { - evb->loopKeepAliveCount_ += evb->loopKeepAliveCountAtomic_.exchange(0); - } - DCHECK(evb->loopKeepAliveCount_ > 0); - if (--evb->loopKeepAliveCount_ == 0) { - evb->loopKeepAliveBaton_.post(); - } - } - }; - using LoopKeepAlive = std::unique_ptr; - /** * Returns you a handle which prevents VirtualEventBase from being destroyed. - * LoopKeepAlive handle can be released from EventBase loop only. - * - * loopKeepAlive() can be called from EventBase thread only. + * KeepAlive handle can be released from EventBase loop only. */ - LoopKeepAlive loopKeepAlive() { - DCHECK(evb_.isInEventBaseThread()); - ++loopKeepAliveCount_; - return LoopKeepAlive(this); - } - - /** - * Thread-safe version of loopKeepAlive() - */ - LoopKeepAlive loopKeepAliveAtomic() { + KeepAlive getKeepAliveToken() override { if (evb_.inRunningEventBaseThread()) { - return loopKeepAlive(); + ++loopKeepAliveCount_; + } else { + ++loopKeepAliveCountAtomic_; } - ++loopKeepAliveCountAtomic_; - return LoopKeepAlive(this); + return makeKeepAlive(); } bool inRunningEventBaseThread() const { return evb_.inRunningEventBaseThread(); } + protected: + void keepAliveRelease() override { + DCHECK(getEventBase().inRunningEventBaseThread()); + if (loopKeepAliveCountAtomic_.load()) { + loopKeepAliveCount_ += loopKeepAliveCountAtomic_.exchange(0); + } + DCHECK(loopKeepAliveCount_ > 0); + if (--loopKeepAliveCount_ == 0) { + loopKeepAliveBaton_.post(); + } + } + private: using LoopCallbackList = EventBase::LoopCallback::List; @@ -171,9 +159,9 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager { ssize_t loopKeepAliveCount_{0}; std::atomic loopKeepAliveCountAtomic_{0}; folly::Baton<> loopKeepAliveBaton_; - LoopKeepAlive loopKeepAlive_; + KeepAlive loopKeepAlive_; - EventBase::LoopKeepAlive evbLoopKeepAlive_; + KeepAlive evbLoopKeepAlive_; folly::Synchronized onDestructionCallbacks_; }; diff --git a/folly/io/async/test/EventBaseTest.cpp b/folly/io/async/test/EventBaseTest.cpp index 454ece3c..5528dc3e 100644 --- a/folly/io/async/test/EventBaseTest.cpp +++ b/folly/io/async/test/EventBaseTest.cpp @@ -1,4 +1,6 @@ /* + * Copyright 2017-present Facebook, Inc. + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +18,6 @@ * specific language governing permissions and limitations * under the License. */ - #include #include @@ -1736,7 +1737,7 @@ TEST(EventBaseTest, LoopKeepAlive) { EventBase evb; bool done = false; - std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable { + std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable { /* sleep override */ std::this_thread::sleep_for( std::chrono::milliseconds(100)); evb.runInEventBaseThread( @@ -1757,7 +1758,7 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) { std::thread t; evb.runInEventBaseThread([&] { - t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable { + t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable { /* sleep override */ std::this_thread::sleep_for( std::chrono::milliseconds(100)); evb.runInEventBaseThread( @@ -1785,9 +1786,9 @@ TEST(EventBaseTest, LoopKeepAliveWithLoopForever) { { auto* ev = evb.get(); - EventBase::LoopKeepAlive keepAlive; + Executor::KeepAlive keepAlive; ev->runInEventBaseThreadAndWait( - [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); }); + [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); }); ASSERT_FALSE(done) << "Loop finished before we asked it to"; ev->terminateLoopSoon(); /* sleep override */ @@ -1807,7 +1808,7 @@ TEST(EventBaseTest, LoopKeepAliveShutdown) { std::thread t([ &done, - loopKeepAlive = evb->loopKeepAlive(), + loopKeepAlive = evb->getKeepAliveToken(), evbPtr = evb.get() ]() mutable { /* sleep override */ std::this_thread::sleep_for( @@ -1839,9 +1840,9 @@ TEST(EventBaseTest, LoopKeepAliveAtomic) { for (size_t i = 0; i < kNumThreads; ++i) { ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] { - std::vector keepAlives; + std::vector keepAlives; for (size_t j = 0; j < kNumTasks; ++j) { - keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic()); + keepAlives.emplace_back(evbPtr->getKeepAliveToken()); } batonPtr->post(); -- 2.34.1