From b981b9f2dbaa2082f3e4794d08d3f57afc4237b4 Mon Sep 17 00:00:00 2001 From: Andrii Grynenko Date: Thu, 17 Nov 2016 12:44:52 -0800 Subject: [PATCH] VirtualEventBase Summary: This implements a view onto an EventBase, which can keep track of all tasks created through it and join them on destruction. Multiple VirtualEventBases can be backed by the same EventBase. This can be useful to have the same IO thread/thread-pool be shared between multiple libraries, allowing them to each have it's own VirtualEventBase and control its lifetime. Since VirtualEventBase also supports LoopKeepAlive and onDestruction functionality, it can be easily integrated with FiberManagerMap. Reviewed By: yfeldblum Differential Revision: D4158719 fbshipit-source-id: 7df964f36e5276e2b5665fd8394ea2e187aa298c --- folly/Makefile.am | 2 + folly/fibers/EventBaseLoopController-inl.h | 30 ++-- folly/fibers/EventBaseLoopController.h | 22 +-- folly/fibers/FiberManagerMap.cpp | 48 +++--- folly/fibers/FiberManagerMap.h | 5 + folly/fibers/test/FibersTest.cpp | 33 ++++ folly/io/async/EventBase.h | 3 + folly/io/async/VirtualEventBase.cpp | 52 +++++++ folly/io/async/VirtualEventBase.h | 172 +++++++++++++++++++++ 9 files changed, 329 insertions(+), 38 deletions(-) create mode 100644 folly/io/async/VirtualEventBase.cpp create mode 100644 folly/io/async/VirtualEventBase.h diff --git a/folly/Makefile.am b/folly/Makefile.am index ba3409b6..b353b1bf 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -234,6 +234,7 @@ nobase_follyinclude_HEADERS = \ io/async/SSLContext.h \ io/async/ScopedEventBaseThread.h \ io/async/TimeoutManager.h \ + io/async/VirtualEventBase.h \ io/async/WriteChainAsyncTransportWrapper.h \ io/async/test/AsyncSSLSocketTest.h \ io/async/test/BlockingSocket.h \ @@ -448,6 +449,7 @@ libfolly_la_SOURCES = \ io/async/Request.cpp \ io/async/SSLContext.cpp \ io/async/ScopedEventBaseThread.cpp \ + io/async/VirtualEventBase.cpp \ io/async/HHWheelTimer.cpp \ io/async/TimeoutManager.cpp \ io/async/test/ScopedBoundPort.cpp \ diff --git a/folly/fibers/EventBaseLoopController-inl.h b/folly/fibers/EventBaseLoopController-inl.h index 89a0ece3..b23fb45c 100644 --- a/folly/fibers/EventBaseLoopController-inl.h +++ b/folly/fibers/EventBaseLoopController-inl.h @@ -19,16 +19,19 @@ namespace folly { namespace fibers { -inline EventBaseLoopController::EventBaseLoopController() +template +inline EventBaseLoopControllerT::EventBaseLoopControllerT() : callback_(*this), aliveWeak_(destructionCallback_.getWeak()) {} -inline EventBaseLoopController::~EventBaseLoopController() { +template +inline EventBaseLoopControllerT::~EventBaseLoopControllerT() { callback_.cancelLoopCallback(); eventBaseKeepAlive_.reset(); } -inline void EventBaseLoopController::attachEventBase( - folly::EventBase& eventBase) { +template +inline void EventBaseLoopControllerT::attachEventBase( + EventBaseT& eventBase) { if (eventBase_ != nullptr) { LOG(ERROR) << "Attempt to reattach EventBase to LoopController"; } @@ -43,11 +46,14 @@ inline void EventBaseLoopController::attachEventBase( } } -inline void EventBaseLoopController::setFiberManager(FiberManager* fm) { +template +inline void EventBaseLoopControllerT::setFiberManager( + FiberManager* fm) { fm_ = fm; } -inline void EventBaseLoopController::schedule() { +template +inline void EventBaseLoopControllerT::schedule() { if (eventBase_ == nullptr) { // In this case we need to postpone scheduling. awaitingScheduling_ = true; @@ -58,11 +64,13 @@ inline void EventBaseLoopController::schedule() { } } -inline void EventBaseLoopController::cancel() { +template +inline void EventBaseLoopControllerT::cancel() { callback_.cancelLoopCallback(); } -inline void EventBaseLoopController::runLoop() { +template +inline void EventBaseLoopControllerT::runLoop() { if (!eventBaseKeepAlive_) { eventBaseKeepAlive_ = eventBase_->loopKeepAlive(); } @@ -76,7 +84,8 @@ inline void EventBaseLoopController::runLoop() { } } -inline void EventBaseLoopController::scheduleThreadSafe( +template +inline void EventBaseLoopControllerT::scheduleThreadSafe( std::function func) { /* The only way we could end up here is if 1) Fiber thread creates a fiber that awaits (which means we must @@ -97,7 +106,8 @@ inline void EventBaseLoopController::scheduleThreadSafe( } } -inline void EventBaseLoopController::timedSchedule( +template +inline void EventBaseLoopControllerT::timedSchedule( std::function func, TimePoint time) { assert(eventBaseAttached_); diff --git a/folly/fibers/EventBaseLoopController.h b/folly/fibers/EventBaseLoopController.h index 80dd6667..fa3f8e80 100644 --- a/folly/fibers/EventBaseLoopController.h +++ b/folly/fibers/EventBaseLoopController.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -28,17 +29,18 @@ class EventBase; namespace folly { namespace fibers { -class EventBaseLoopController : public LoopController { +template +class EventBaseLoopControllerT : public LoopController { public: - explicit EventBaseLoopController(); - ~EventBaseLoopController(); + explicit EventBaseLoopControllerT(); + ~EventBaseLoopControllerT(); /** * Attach EventBase after LoopController was created. */ - void attachEventBase(folly::EventBase& eventBase); + void attachEventBase(EventBaseT& eventBase); - folly::EventBase* getEventBase() { + EventBaseT* getEventBase() { return eventBase_; } @@ -49,7 +51,7 @@ class EventBaseLoopController : public LoopController { private: class ControllerCallback : public folly::EventBase::LoopCallback { public: - explicit ControllerCallback(EventBaseLoopController& controller) + explicit ControllerCallback(EventBaseLoopControllerT& controller) : controller_(controller) {} void runLoopCallback() noexcept override { @@ -57,7 +59,7 @@ class EventBaseLoopController : public LoopController { } private: - EventBaseLoopController& controller_; + EventBaseLoopControllerT& controller_; }; class DestructionCallback : public folly::EventBase::LoopCallback { @@ -90,8 +92,8 @@ class EventBaseLoopController : public LoopController { }; bool awaitingScheduling_{false}; - folly::EventBase* eventBase_{nullptr}; - folly::EventBase::LoopKeepAlive eventBaseKeepAlive_; + EventBaseT* eventBase_{nullptr}; + typename EventBaseT::LoopKeepAlive eventBaseKeepAlive_; ControllerCallback callback_; DestructionCallback destructionCallback_; FiberManager* fm_{nullptr}; @@ -110,6 +112,8 @@ class EventBaseLoopController : public LoopController { friend class FiberManager; }; + +using EventBaseLoopController = EventBaseLoopControllerT; } } // folly::fibers diff --git a/folly/fibers/FiberManagerMap.cpp b/folly/fibers/FiberManagerMap.cpp index 4a45e665..d96fb82e 100644 --- a/folly/fibers/FiberManagerMap.cpp +++ b/folly/fibers/FiberManagerMap.cpp @@ -26,22 +26,24 @@ namespace fibers { namespace { +template class EventBaseOnDestructionCallback : public EventBase::LoopCallback { public: - explicit EventBaseOnDestructionCallback(EventBase& evb) : evb_(evb) {} + explicit EventBaseOnDestructionCallback(EventBaseT& evb) : evb_(evb) {} void runLoopCallback() noexcept override; private: - EventBase& evb_; + EventBaseT& evb_; }; +template class GlobalCache { public: - static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) { + static FiberManager& get(EventBaseT& evb, const FiberManager::Options& opts) { return instance().getImpl(evb, opts); } - static std::unique_ptr erase(EventBase& evb) { + static std::unique_ptr erase(EventBaseT& evb) { return instance().eraseImpl(evb); } @@ -55,15 +57,15 @@ class GlobalCache { return *ret; } - FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) { + FiberManager& getImpl(EventBaseT& evb, const FiberManager::Options& opts) { std::lock_guard lg(mutex_); auto& fmPtrRef = map_[&evb]; if (!fmPtrRef) { - auto loopController = make_unique(); + auto loopController = make_unique>(); loopController->attachEventBase(evb); - evb.runOnDestruction(new EventBaseOnDestructionCallback(evb)); + evb.runOnDestruction(new EventBaseOnDestructionCallback(evb)); fmPtrRef = make_unique(std::move(loopController), opts); } @@ -71,7 +73,7 @@ class GlobalCache { return *fmPtrRef; } - std::unique_ptr eraseImpl(EventBase& evb) { + std::unique_ptr eraseImpl(EventBaseT& evb) { std::lock_guard lg(mutex_); DCHECK_EQ(1, map_.count(&evb)); @@ -82,18 +84,19 @@ class GlobalCache { } std::mutex mutex_; - std::unordered_map> map_; + std::unordered_map> map_; }; constexpr size_t kEraseListMaxSize = 64; +template class ThreadLocalCache { public: - static FiberManager& get(EventBase& evb, const FiberManager::Options& opts) { + static FiberManager& get(EventBaseT& evb, const FiberManager::Options& opts) { return instance()->getImpl(evb, opts); } - static void erase(EventBase& evb) { + static void erase(EventBaseT& evb) { for (auto& localInstance : instance().accessAllThreads()) { SYNCHRONIZED(info, localInstance.eraseInfo_) { if (info.eraseList.size() >= kEraseListMaxSize) { @@ -121,12 +124,12 @@ class ThreadLocalCache { return *ret; } - FiberManager& getImpl(EventBase& evb, const FiberManager::Options& opts) { + FiberManager& getImpl(EventBaseT& evb, const FiberManager::Options& opts) { eraseImpl(); auto& fmPtrRef = map_[&evb]; if (!fmPtrRef) { - fmPtrRef = &GlobalCache::get(evb, opts); + fmPtrRef = &GlobalCache::get(evb, opts); } DCHECK(fmPtrRef != nullptr); @@ -154,21 +157,22 @@ class ThreadLocalCache { } } - std::unordered_map map_; + std::unordered_map map_; std::atomic eraseRequested_{false}; struct EraseInfo { bool eraseAll{false}; - std::vector eraseList; + std::vector eraseList; }; folly::Synchronized eraseInfo_; }; -void EventBaseOnDestructionCallback::runLoopCallback() noexcept { - auto fm = GlobalCache::erase(evb_); +template +void EventBaseOnDestructionCallback::runLoopCallback() noexcept { + auto fm = GlobalCache::erase(evb_); DCHECK(fm.get() != nullptr); - ThreadLocalCache::erase(evb_); + ThreadLocalCache::erase(evb_); delete this; } @@ -178,7 +182,13 @@ void EventBaseOnDestructionCallback::runLoopCallback() noexcept { FiberManager& getFiberManager( EventBase& evb, const FiberManager::Options& opts) { - return ThreadLocalCache::get(evb, opts); + return ThreadLocalCache::get(evb, opts); +} + +FiberManager& getFiberManager( + VirtualEventBase& evb, + const FiberManager::Options& opts) { + return ThreadLocalCache::get(evb, opts); } } } diff --git a/folly/fibers/FiberManagerMap.h b/folly/fibers/FiberManagerMap.h index d76bcd6b..b61c088a 100644 --- a/folly/fibers/FiberManagerMap.h +++ b/folly/fibers/FiberManagerMap.h @@ -17,6 +17,7 @@ #include #include +#include namespace folly { namespace fibers { @@ -24,5 +25,9 @@ namespace fibers { FiberManager& getFiberManager( folly::EventBase& evb, const FiberManager::Options& opts = FiberManager::Options()); + +FiberManager& getFiberManager( + folly::VirtualEventBase& evb, + const FiberManager::Options& opts = FiberManager::Options()); } } diff --git a/folly/fibers/test/FibersTest.cpp b/folly/fibers/test/FibersTest.cpp index d11ddb30..9a828ad4 100644 --- a/folly/fibers/test/FibersTest.cpp +++ b/folly/fibers/test/FibersTest.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include using namespace folly::fibers; @@ -2041,6 +2042,38 @@ TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) { validateResults(results, COUNT); } +TEST(FiberManager, VirtualEventBase) { + folly::ScopedEventBaseThread thread; + + auto evb1 = + folly::make_unique(*thread.getEventBase()); + auto evb2 = + folly::make_unique(*thread.getEventBase()); + + bool done1{false}; + bool done2{false}; + + getFiberManager(*evb1).addTaskRemote([&] { + Baton baton; + baton.timed_wait(std::chrono::milliseconds{100}); + + done1 = true; + }); + + getFiberManager(*evb2).addTaskRemote([&] { + Baton baton; + baton.timed_wait(std::chrono::milliseconds{200}); + + done2 = true; + }); + + evb1.reset(); + EXPECT_TRUE(done1); + + evb2.reset(); + EXPECT_TRUE(done2); +} + /** * Test that we can properly track fiber stack usage. * diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index 4e5bf78a..3320bf26 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -102,6 +102,8 @@ class RequestEventBase : public RequestData { static constexpr const char* kContextDataName{"EventBase"}; }; +class VirtualEventBase; + /** * This class is a wrapper for all asynchronous I/O processing functionality * @@ -159,6 +161,7 @@ class EventBase : private boost::noncopyable, // EventBase needs access to LoopCallbackList (and therefore to hook_) friend class EventBase; + friend class VirtualEventBase; std::shared_ptr context_; }; diff --git a/folly/io/async/VirtualEventBase.cpp b/folly/io/async/VirtualEventBase.cpp new file mode 100644 index 00000000..c504cbc1 --- /dev/null +++ b/folly/io/async/VirtualEventBase.cpp @@ -0,0 +1,52 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +namespace folly { + +VirtualEventBase::VirtualEventBase(EventBase& evb) : evb_(evb) { + evbLoopKeepAlive_ = evb_.loopKeepAliveAtomic(); + loopKeepAlive_ = loopKeepAliveAtomic(); +} + +VirtualEventBase::~VirtualEventBase() { + CHECK(!evb_.inRunningEventBaseThread()); + + CHECK(evb_.runInEventBaseThread([&] { loopKeepAlive_.reset(); })); + loopKeepAliveBaton_.wait(); + + CHECK(evb_.runInEventBaseThreadAndWait([&] { + clearCobTimeouts(); + + onDestructionCallbacks_.withWLock([&](LoopCallbackList& callbacks) { + while (!callbacks.empty()) { + auto& callback = callbacks.front(); + callbacks.pop_front(); + callback.runLoopCallback(); + } + }); + + evbLoopKeepAlive_.reset(); + })); +} + +void VirtualEventBase::runOnDestruction(EventBase::LoopCallback* callback) { + onDestructionCallbacks_.withWLock([&](LoopCallbackList& callbacks) { + callback->cancelLoopCallback(); + callbacks.push_back(*callback); + }); +} +} diff --git a/folly/io/async/VirtualEventBase.h b/folly/io/async/VirtualEventBase.h new file mode 100644 index 00000000..c26f92cf --- /dev/null +++ b/folly/io/async/VirtualEventBase.h @@ -0,0 +1,172 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace folly { + +/** + * VirtualEventBase implements a light-weight view onto existing EventBase. + * + * Multiple VirtualEventBases can be backed by a single EventBase. Similarly + * to EventBase, VirtualEventBase implements loopKeepAlive() functionality, + * which allows callbacks holding LoopKeepAlive token to keep EventBase looping + * until they are complete. + * + * VirtualEventBase destructor blocks until all its KeepAliveTokens are released + * and all tasks scheduled through it are complete. EventBase destructor also + * blocks until all VirtualEventBases backed by it are released. + */ +class VirtualEventBase : public folly::Executor, public folly::TimeoutManager { + public: + explicit VirtualEventBase(EventBase& evb); + + VirtualEventBase(const VirtualEventBase&) = delete; + VirtualEventBase& operator=(const VirtualEventBase&) = delete; + + ~VirtualEventBase(); + + EventBase& getEventBase() { + return evb_; + } + + /** + * Adds the given callback to a queue of things run before destruction + * of current VirtualEventBase. + * + * This allows users of VirtualEventBase that run in it, but don't control it, + * to be notified before VirtualEventBase gets destructed. + * + * Note: this will be called from the loop of the EventBase, backing this + * VirtualEventBase + */ + void runOnDestruction(EventBase::LoopCallback* callback); + + /** + * @see EventBase::runInLoop + */ + template + void runInLoop(F&& f, bool thisIteration = false) { + evb_.runInLoop(std::forward(f), thisIteration); + } + + /** + * VirtualEventBase destructor blocks until all tasks scheduled through its + * runInEventBaseThread are complete. + * + * @see EventBase::runInEventBaseThread + */ + template + void runInEventBaseThread(F&& f) { + // LoopKeepAlive token has to be released in the EventBase thread. If + // runInEventBaseThread() fails, we can't extract the LoopKeepAlive token + // from the callback to properly release it. + CHECK(evb_.runInEventBaseThread([ + keepAlive = loopKeepAliveAtomic(), + f = std::forward(f) + ]() mutable { f(); })); + } + + void attachTimeoutManager( + AsyncTimeout* obj, + TimeoutManager::InternalEnum internal) override { + evb_.attachTimeoutManager(obj, internal); + } + + void detachTimeoutManager(AsyncTimeout* obj) override { + evb_.detachTimeoutManager(obj); + } + + bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout) + override { + return evb_.scheduleTimeout(obj, timeout); + } + + void cancelTimeout(AsyncTimeout* obj) override { + evb_.cancelTimeout(obj); + } + + void bumpHandlingTime() override { + evb_.bumpHandlingTime(); + } + + bool isInTimeoutManagerThread() override { + return evb_.isInTimeoutManagerThread(); + } + + /** + * @see runInEventBaseThread + */ + void add(folly::Func f) override { + runInEventBaseThread(std::move(f)); + } + + struct LoopKeepAliveDeleter { + void operator()(VirtualEventBase* evb) { + DCHECK(evb->getEventBase().inRunningEventBaseThread()); + if (evb->loopKeepAliveCountAtomic_.load()) { + evb->loopKeepAliveCount_ += evb->loopKeepAliveCountAtomic_.exchange(0); + } + DCHECK(evb->loopKeepAliveCount_ > 0); + if (--evb->loopKeepAliveCount_ == 0) { + evb->loopKeepAliveBaton_.post(); + } + } + }; + using LoopKeepAlive = std::unique_ptr; + + /** + * Returns you a handle which prevents VirtualEventBase from being destroyed. + * LoopKeepAlive handle can be released from EventBase loop only. + * + * loopKeepAlive() can be called from EventBase thread only. + */ + LoopKeepAlive loopKeepAlive() { + DCHECK(evb_.isInEventBaseThread()); + ++loopKeepAliveCount_; + return LoopKeepAlive(this); + } + + /** + * Thread-safe version of loopKeepAlive() + */ + LoopKeepAlive loopKeepAliveAtomic() { + if (evb_.inRunningEventBaseThread()) { + return loopKeepAlive(); + } + ++loopKeepAliveCountAtomic_; + return LoopKeepAlive(this); + } + + private: + using LoopCallbackList = EventBase::LoopCallback::List; + + EventBase& evb_; + + ssize_t loopKeepAliveCount_{0}; + std::atomic loopKeepAliveCountAtomic_{0}; + folly::Baton<> loopKeepAliveBaton_; + LoopKeepAlive loopKeepAlive_; + + EventBase::LoopKeepAlive evbLoopKeepAlive_; + + folly::Synchronized onDestructionCallbacks_; +}; +} -- 2.34.1