From 8d04c40413bbce86408b05814de44d06fe1f8723 Mon Sep 17 00:00:00 2001 From: Andrii Grynenko Date: Tue, 15 Nov 2016 14:52:48 -0800 Subject: [PATCH] Thread-safe version of loopKeepAlive() Reviewed By: yfeldblum Differential Revision: D4152380 fbshipit-source-id: 8b3c6dc4b14b9138bb5012e05f50496e51c0fa4b --- folly/io/async/EventBase.cpp | 22 ++++++++++--- folly/io/async/EventBase.h | 22 +++++++++++-- folly/io/async/test/EventBaseTest.cpp | 45 +++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index 2474a0e8..4b3bfac2 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -176,7 +176,7 @@ EventBase::~EventBase() { // Keep looping until all keep-alive handles are released. Each keep-alive // handle signals that some external code will still schedule some work on // this EventBase (so it's not safe to destroy it). - while (loopKeepAliveCount_ > 0) { + while (loopKeepAliveCount() > 0) { applyLoopKeepAlive(); loopOnce(); } @@ -412,13 +412,22 @@ bool EventBase::loopBody(int flags) { return true; } +ssize_t EventBase::loopKeepAliveCount() { + if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) { + loopKeepAliveCount_ += + loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed); + } + DCHECK_GE(loopKeepAliveCount_, 0); + return loopKeepAliveCount_; +} + void EventBase::applyLoopKeepAlive() { - if (loopKeepAliveActive_ && loopKeepAliveCount_ == 0) { + if (loopKeepAliveActive_ && loopKeepAliveCount() == 0) { // Restore the notification queue internal flag fnRunner_->stopConsuming(); fnRunner_->startConsumingInternal(this, queue_.get()); loopKeepAliveActive_ = false; - } else if (!loopKeepAliveActive_ && loopKeepAliveCount_ > 0) { + } else if (!loopKeepAliveActive_ && loopKeepAliveCount() > 0) { // Update the notification queue event to treat it as a normal // (non-internal) event. The notification queue event always remains // installed, and the main loop won't exit with it installed. @@ -435,7 +444,12 @@ void EventBase::loopForever() { applyLoopKeepAlive(); }; // Make sure notification queue events are treated as normal events. - auto keepAlive = loopKeepAlive(); + // We can't use loopKeepAlive() here since LoopKeepAlive token can only be + // released inside a loop. + ++loopKeepAliveCount_; + SCOPE_EXIT { + --loopKeepAliveCount_; + }; ret = loop(); } diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index a8d4926f..4e5bf78a 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -555,7 +556,12 @@ class EventBase : private boost::noncopyable, /// Implements the DrivableExecutor interface void drive() override { - auto keepAlive = loopKeepAlive(); + // We can't use loopKeepAlive() here since LoopKeepAlive token can only be + // released inside a loop. + ++loopKeepAliveCount_; + SCOPE_EXIT { + --loopKeepAliveCount_; + }; loopOnce(); } @@ -579,6 +585,15 @@ class EventBase : private boost::noncopyable, return LoopKeepAlive(this); } + // Thread-safe version of loopKeepAlive() + LoopKeepAlive loopKeepAliveAtomic() { + if (inRunningEventBaseThread()) { + return loopKeepAlive(); + } + loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed); + return LoopKeepAlive(this); + } + // TimeoutManager void attachTimeoutManager( AsyncTimeout* obj, @@ -598,6 +613,8 @@ class EventBase : private boost::noncopyable, private: void applyLoopKeepAlive(); + ssize_t loopKeepAliveCount(); + /* * Helper function that tells us whether we have already handled * some event/timeout/callback in this loop iteration. @@ -645,7 +662,8 @@ class EventBase : private boost::noncopyable, // to send function requests to the EventBase thread. std::unique_ptr> queue_; std::unique_ptr fnRunner_; - size_t loopKeepAliveCount_{0}; + ssize_t loopKeepAliveCount_{0}; + std::atomic loopKeepAliveCountAtomic_{0}; bool loopKeepAliveActive_{false}; // limit for latency in microseconds (0 disables) diff --git a/folly/io/async/test/EventBaseTest.cpp b/folly/io/async/test/EventBaseTest.cpp index 06deeebd..524cff7b 100644 --- a/folly/io/async/test/EventBaseTest.cpp +++ b/folly/io/async/test/EventBaseTest.cpp @@ -1826,6 +1826,51 @@ TEST(EventBaseTest, LoopKeepAliveShutdown) { t.join(); } +TEST(EventBaseTest, LoopKeepAliveAtomic) { + auto evb = folly::make_unique(); + + constexpr size_t kNumThreads = 100; + constexpr size_t kNumTasks = 100; + + std::vector ts; + std::vector>> batons; + size_t done{0}; + + for (size_t i = 0; i < kNumThreads; ++i) { + batons.emplace_back(std::make_unique>()); + } + + for (size_t i = 0; i < kNumThreads; ++i) { + ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] { + std::vector keepAlives; + for (size_t i = 0; i < kNumTasks; ++i) { + keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic()); + } + + batonPtr->post(); + + /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1)); + + for (auto& keepAlive : keepAlives) { + evbPtr->runInEventBaseThread( + [&done, keepAlive = std::move(keepAlive) ]() { ++done; }); + } + }); + } + + for (auto& baton : batons) { + baton->wait(); + } + + evb.reset(); + + EXPECT_EQ(kNumThreads * kNumTasks, done); + + for (auto& t : ts) { + t.join(); + } +} + TEST(EventBaseTest, DrivableExecutorTest) { folly::Promise p; auto f = p.getFuture(); -- 2.34.1