From: Joseph Griego Date: Tue, 14 Jun 2016 00:49:38 +0000 (-0700) Subject: EventBase keepAlive counter is not atomic X-Git-Tag: 2016.07.26~144 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=cafe469ac72ad3177dea2a7b130499f444a91233;p=folly.git EventBase keepAlive counter is not atomic Summary: Since loopKeepAlive() is always used from the EventBase thread, there's no need for the overhead of an shared_ptr (and therefore, an atomic ref counter); we can get away without thread safety. This also allows us to discard the (sometimes incorrect) optimization of not returning a handle when it appears the loop will continue running anyways Reviewed By: andriigrynenko Differential Revision: D3375503 fbshipit-source-id: 474e4fcf992bdc4fcca9370d3c57bdcc4e042386 --- diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index 9d5949e0..db2efcef 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -198,7 +198,7 @@ EventBase::~EventBase() { // Keep looping until all keep-alive handles are released. Each keep-alive // handle signals that some external code will still schedule some work on // this EventBase (so it's not safe to destroy it). - while (!loopKeepAlive_.unique()) { + while (loopKeepAliveCount_ > 0) { applyLoopKeepAlive(); loopOnce(); } @@ -448,12 +448,12 @@ bool EventBase::loopBody(int flags) { } void EventBase::applyLoopKeepAlive() { - if (loopKeepAliveActive_ && loopKeepAlive_.unique()) { + if (loopKeepAliveActive_ && loopKeepAliveCount_ == 0) { // Restore the notification queue internal flag fnRunner_->stopConsuming(); fnRunner_->startConsumingInternal(this, queue_.get()); loopKeepAliveActive_ = false; - } else if (!loopKeepAliveActive_ && !loopKeepAlive_.unique()) { + } else if (!loopKeepAliveActive_ && loopKeepAliveCount_ > 0) { // Update the notification queue event to treat it as a normal // (non-internal) event. The notification queue event always remains // installed, and the main loop won't exit with it installed. @@ -468,11 +468,9 @@ void EventBase::loopForever() { { SCOPE_EXIT { applyLoopKeepAlive(); - loopForeverActive_ = false; }; - loopForeverActive_ = true; // Make sure notification queue events are treated as normal events. - auto loopKeepAlive = loopKeepAlive_; + auto keepAlive = loopKeepAlive(); ret = loop(); } diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index 3fcf60e3..89e89fd1 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -587,7 +587,13 @@ class EventBase : private boost::noncopyable, loopOnce(); } - using LoopKeepAlive = std::shared_ptr; + struct LoopKeepAliveDeleter { + void operator()(EventBase* evb) { + DCHECK(evb->isInEventBaseThread()); + evb->loopKeepAliveCount_--; + } + }; + using LoopKeepAlive = std::unique_ptr; /// Returns you a handle which make loop() behave like loopForever() until /// destroyed. loop() will return to its original behavior only when all @@ -596,11 +602,9 @@ class EventBase : private boost::noncopyable, /// /// May return no op LoopKeepAlive if loopForever() is already running. LoopKeepAlive loopKeepAlive() { - if (loopForeverActive_) { - return nullptr; - } else { - return loopKeepAlive_; - } + DCHECK(isInEventBaseThread()); + loopKeepAliveCount_++; + return LoopKeepAlive(this); } private: @@ -692,9 +696,8 @@ class EventBase : private boost::noncopyable, // to send function requests to the EventBase thread. std::unique_ptr> queue_; std::unique_ptr fnRunner_; - LoopKeepAlive loopKeepAlive_{std::make_shared(42)}; + size_t loopKeepAliveCount_{0}; bool loopKeepAliveActive_{false}; - std::atomic loopForeverActive_{false}; // limit for latency in microseconds (0 disables) int64_t maxLatency_; diff --git a/folly/io/async/test/EventBaseTest.cpp b/folly/io/async/test/EventBaseTest.cpp index bc8988ab..d5998bf1 100644 --- a/folly/io/async/test/EventBaseTest.cpp +++ b/folly/io/async/test/EventBaseTest.cpp @@ -1733,7 +1733,7 @@ TEST(EventBaseTest, LoopKeepAlive) { EventBase evb; bool done = false; - std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ] { + std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable { /* sleep override */ std::this_thread::sleep_for( std::chrono::milliseconds(100)); evb.runInEventBaseThread( @@ -1754,7 +1754,7 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) { std::thread t; evb.runInEventBaseThread([&] { - t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ] { + t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable { /* sleep override */ std::this_thread::sleep_for( std::chrono::milliseconds(100)); evb.runInEventBaseThread( @@ -1769,20 +1769,49 @@ TEST(EventBaseTest, LoopKeepAliveInLoop) { t.join(); } +TEST(EventBaseTest, LoopKeepAliveWithLoopForever) { + std::unique_ptr evb = folly::make_unique(); + + bool done = false; + + std::thread evThread([&] { + evb->loopForever(); + evb.reset(); + done = true; + }); + + { + auto* ev = evb.get(); + EventBase::LoopKeepAlive keepAlive; + ev->runInEventBaseThreadAndWait( + [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); }); + ASSERT_FALSE(done) << "Loop finished before we asked it to"; + ev->terminateLoopSoon(); + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + ASSERT_FALSE(done) << "Loop terminated early"; + ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{}); + } + + evThread.join(); + ASSERT_TRUE(done); +} + TEST(EventBaseTest, LoopKeepAliveShutdown) { auto evb = folly::make_unique(); bool done = false; - std::thread t( - [&done, loopKeepAlive = evb->loopKeepAlive(), evbPtr = evb.get() ] { - /* sleep override */ std::this_thread::sleep_for( - std::chrono::milliseconds(100)); - evbPtr->runInEventBaseThread( - [&done, loopKeepAlive = std::move(loopKeepAlive) ] { - done = true; - }); - }); + std::thread t([ + &done, + loopKeepAlive = evb->loopKeepAlive(), + evbPtr = evb.get() + ]() mutable { + /* sleep override */ std::this_thread::sleep_for( + std::chrono::milliseconds(100)); + evbPtr->runInEventBaseThread( + [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; }); + }); evb.reset();