From fc84e02a134e25cb8f417f8f9d8629e00bfb48e7 Mon Sep 17 00:00:00 2001 From: Giuseppe Ottaviano Date: Wed, 21 Jun 2017 12:28:52 -0700 Subject: [PATCH] Print expected/actual thread names when running EventBase logic in unexpected thread Summary: The existing assertion errors do not give a lot of information to track down which thread is incorrectly using the `EventBase`. Print both the current thread name and the name of the thread running the event base loop. Reviewed By: luciang Differential Revision: D5289790 fbshipit-source-id: 563c7f68b7f9b7a6e85e22290d7e81afbf89871e --- folly/io/async/AsyncSSLSocket.cpp | 4 +-- folly/io/async/AsyncServerSocket.cpp | 39 ++++++++++++++++++-------- folly/io/async/AsyncServerSocket.h | 4 ++- folly/io/async/AsyncSocket.cpp | 42 +++++++++++++++++----------- folly/io/async/AsyncUDPSocket.cpp | 4 +-- folly/io/async/EventBase.cpp | 31 +++++++++++++++----- folly/io/async/EventBase.h | 14 +++++++++- folly/io/async/EventBaseLocal.cpp | 8 +++--- folly/io/async/EventHandler.cpp | 2 +- folly/io/async/NotificationQueue.h | 2 +- folly/io/async/VirtualEventBase.h | 2 +- 11 files changed, 104 insertions(+), 48 deletions(-) diff --git a/folly/io/async/AsyncSSLSocket.cpp b/folly/io/async/AsyncSSLSocket.cpp index 84354730..5c9ee683 100644 --- a/folly/io/async/AsyncSSLSocket.cpp +++ b/folly/io/async/AsyncSSLSocket.cpp @@ -450,7 +450,7 @@ void AsyncSSLSocket::sslAccept( std::chrono::milliseconds timeout, const SSLContext::SSLVerifyPeerEnum& verifyPeer) { DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); verifyPeer_ = verifyPeer; // Make sure we're in the uninitialized state @@ -749,7 +749,7 @@ void AsyncSSLSocket::sslConn( std::chrono::milliseconds timeout, const SSLContext::SSLVerifyPeerEnum& verifyPeer) { DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); // Cache local and remote socket addresses to keep them available // after socket file descriptor is closed. diff --git a/folly/io/async/AsyncServerSocket.cpp b/folly/io/async/AsyncServerSocket.cpp index 7e02dde7..985f3d3f 100644 --- a/folly/io/async/AsyncServerSocket.cpp +++ b/folly/io/async/AsyncServerSocket.cpp @@ -182,7 +182,9 @@ int AsyncServerSocket::stopAccepting(int shutdownFlags) { VLOG(10) << "AsyncServerSocket::stopAccepting " << this << handler.socket_; } - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } // When destroy is called, unregister and close the socket immediately. accepting_ = false; @@ -244,7 +246,7 @@ void AsyncServerSocket::destroy() { void AsyncServerSocket::attachEventBase(EventBase *eventBase) { assert(eventBase_ == nullptr); - assert(eventBase->isInEventBaseThread()); + eventBase->dcheckIsInEventBaseThread(); eventBase_ = eventBase; for (auto& handler : sockets_) { @@ -254,7 +256,7 @@ void AsyncServerSocket::attachEventBase(EventBase *eventBase) { void AsyncServerSocket::detachEventBase() { assert(eventBase_ != nullptr); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); assert(!accepting_); eventBase_ = nullptr; @@ -264,7 +266,9 @@ void AsyncServerSocket::detachEventBase() { } void AsyncServerSocket::useExistingSockets(const std::vector& fds) { - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } if (sockets_.size() > 0) { throw std::invalid_argument( @@ -328,7 +332,9 @@ void AsyncServerSocket::bindSocket( } void AsyncServerSocket::bind(const SocketAddress& address) { - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } // useExistingSocket() may have been called to initialize socket_ already. // However, in the normal case we need to create a new socket now. @@ -505,7 +511,9 @@ void AsyncServerSocket::bind(uint16_t port) { } void AsyncServerSocket::listen(int backlog) { - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } // Start listening for (auto& handler : sockets_) { @@ -539,7 +547,9 @@ std::vector AsyncServerSocket::getAddresses() void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback, EventBase *eventBase, uint32_t maxAtOnce) { - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } // If this is the first accept callback and we are supposed to be accepting, // start accepting once the callback is installed. @@ -585,7 +595,9 @@ void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback, void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback, EventBase *eventBase) { - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } // Find the matching AcceptCallback. // We just do a simple linear search; we don't expect removeAcceptCallback() @@ -648,7 +660,9 @@ void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback, } void AsyncServerSocket::startAccepting() { - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } accepting_ = true; if (callbacks_.empty()) { @@ -666,7 +680,9 @@ void AsyncServerSocket::startAccepting() { } void AsyncServerSocket::pauseAccepting() { - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } accepting_ = false; for (auto& handler : sockets_) { handler. unregisterHandler(); @@ -1023,7 +1039,8 @@ void AsyncServerSocket::backoffTimeoutExpired() { // the backoff timeout. assert(accepting_); // We can't be detached from the EventBase without being paused - assert(eventBase_ != nullptr && eventBase_->isInEventBaseThread()); + assert(eventBase_ != nullptr); + eventBase_->dcheckIsInEventBaseThread(); // If all of the callbacks were removed, we shouldn't re-enable accepts if (callbacks_.empty()) { diff --git a/folly/io/async/AsyncServerSocket.h b/folly/io/async/AsyncServerSocket.h index ff2f539c..1d53bccc 100644 --- a/folly/io/async/AsyncServerSocket.h +++ b/folly/io/async/AsyncServerSocket.h @@ -588,7 +588,9 @@ class AsyncServerSocket : public DelayedDestruction * socket's primary EventBase. */ int64_t getNumPendingMessagesInQueue() const { - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } int64_t numMsgs = 0; for (const auto& callback : callbacks_) { numMsgs += callback.consumer->getQueue()->size(); diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index 46cc0d3b..6cd879d9 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -266,7 +266,9 @@ AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket) // init() method, since constructor forwarding isn't supported in most // compilers yet. void AsyncSocket::init() { - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } shutdownFlags_ = 0; state_ = StateEnum::UNINIT; eventFlags_ = EventHandler::NONE; @@ -356,7 +358,7 @@ void AsyncSocket::connect(ConnectCallback* callback, const OptionMap &options, const folly::SocketAddress& bindAddr) noexcept { DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); addr_ = address; @@ -592,7 +594,9 @@ void AsyncSocket::cancelConnect() { void AsyncSocket::setSendTimeout(uint32_t milliseconds) { sendTimeout_ = milliseconds; - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } // If we are currently pending on write requests, immediately update // writeTimeout_ with the new value. @@ -628,7 +632,7 @@ void AsyncSocket::setErrMessageCB(ErrMessageCallback* callback) { } DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); if (callback == nullptr) { // We should be able to reset the callback regardless of the @@ -714,7 +718,7 @@ void AsyncSocket::setReadCB(ReadCallback *callback) { } DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); switch ((StateEnum)state_) { case StateEnum::CONNECTING: @@ -816,7 +820,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, << ", state=" << state_; DestructorGuard dg(this); unique_ptrioBuf(std::move(buf)); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) { // No new writes may be performed after the write side of the socket has @@ -966,7 +970,7 @@ void AsyncSocket::close() { // Declare a DestructorGuard to ensure that the AsyncSocket cannot be // destroyed until close() returns. DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); // Since there are write requests pending, we have to set the // SHUT_WRITE_PENDING flag, and wait to perform the real close until the @@ -998,7 +1002,9 @@ void AsyncSocket::closeNow() { << ", state=" << state_ << ", shutdownFlags=" << std::hex << (int) shutdownFlags_; DestructorGuard dg(this); - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } switch (state_) { case StateEnum::ESTABLISHED: @@ -1090,7 +1096,7 @@ void AsyncSocket::shutdownWrite() { return; } - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); // There are pending writes. Set SHUT_WRITE_PENDING so that the actual // shutdown will be performed once all writes complete. @@ -1117,7 +1123,9 @@ void AsyncSocket::shutdownWriteNow() { } DestructorGuard dg(this); - assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread()); + if (eventBase_) { + eventBase_->dcheckIsInEventBaseThread(); + } switch (static_cast(state_)) { case StateEnum::ESTABLISHED: @@ -1245,7 +1253,7 @@ void AsyncSocket::attachEventBase(EventBase* eventBase) { << ", state=" << state_ << ", events=" << std::hex << eventFlags_ << ")"; assert(eventBase_ == nullptr); - assert(eventBase->isInEventBaseThread()); + eventBase->dcheckIsInEventBaseThread(); eventBase_ = eventBase; ioHandler_.attachEventBase(eventBase); @@ -1260,7 +1268,7 @@ void AsyncSocket::detachEventBase() { << ", old evb=" << eventBase_ << ", state=" << state_ << ", events=" << std::hex << eventFlags_ << ")"; assert(eventBase_ != nullptr); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); eventBase_ = nullptr; ioHandler_.detachEventBase(); @@ -1272,7 +1280,7 @@ void AsyncSocket::detachEventBase() { bool AsyncSocket::isDetachable() const { DCHECK(eventBase_ != nullptr); - DCHECK(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled(); } @@ -1428,7 +1436,7 @@ void AsyncSocket::ioReady(uint16_t events) noexcept { << ", events=" << std::hex << events << ", state=" << state_; DestructorGuard dg(this); assert(events & EventHandler::READ_WRITE); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); uint16_t relevantEvents = uint16_t(events & EventHandler::READ_WRITE); EventBase* originalEventBase = eventBase_; @@ -1973,7 +1981,7 @@ void AsyncSocket::timeoutExpired() noexcept { VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: " << "state=" << state_ << ", events=" << std::hex << eventFlags_; DestructorGuard dg(this); - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); if (state_ == StateEnum::CONNECTING) { // connect() timed out @@ -2151,13 +2159,13 @@ AsyncSocket::WriteResult AsyncSocket::performWrite( * and call all currently installed callbacks. After an error, the * AsyncSocket is completely unregistered. * - * @return Returns true on succcess, or false on error. + * @return Returns true on success, or false on error. */ bool AsyncSocket::updateEventRegistration() { VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_ << ", events=" << std::hex << eventFlags_; - assert(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); if (eventFlags_ == EventHandler::NONE) { ioHandler_.unregisterHandler(); return true; diff --git a/folly/io/async/AsyncUDPSocket.cpp b/folly/io/async/AsyncUDPSocket.cpp index 5d99e444..43c79c21 100644 --- a/folly/io/async/AsyncUDPSocket.cpp +++ b/folly/io/async/AsyncUDPSocket.cpp @@ -39,7 +39,7 @@ AsyncUDPSocket::AsyncUDPSocket(EventBase* evb) eventBase_(evb), fd_(-1), readCallback_(nullptr) { - DCHECK(evb->isInEventBaseThread()); + evb->dcheckIsInEventBaseThread(); } AsyncUDPSocket::~AsyncUDPSocket() { @@ -201,7 +201,7 @@ void AsyncUDPSocket::pauseRead() { } void AsyncUDPSocket::close() { - DCHECK(eventBase_->isInEventBaseThread()); + eventBase_->dcheckIsInEventBaseThread(); if (readCallback_) { auto cob = readCallback_; diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index cf8a87fa..6d4a9684 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -198,6 +198,23 @@ void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) { fnRunner_->setMaxReadAtOnce(maxAtOnce); } +void EventBase::checkIsInEventBaseThread() const { + auto evbTid = loopThread_.load(std::memory_order_relaxed); + if (evbTid == std::thread::id()) { + return; + } + + // Using getThreadName(evbTid) instead of name_ will work also if + // the thread name is set outside of EventBase (and name_ is empty). + auto curTid = std::this_thread::get_id(); + CHECK(evbTid == curTid) + << "This logic must be executed in the event base thread. " + << "Event base thread name: \"" + << folly::getThreadName(evbTid).value_or("") + << "\", current thread name: \"" + << folly::getThreadName(curTid).value_or("") << "\""; +} + // Set smoothing coefficient for loop load average; input is # of milliseconds // for exp(-1) decay. void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) { @@ -486,7 +503,7 @@ void EventBase::terminateLoopSoon() { } void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) { - DCHECK(isInEventBaseThread()); + dcheckIsInEventBaseThread(); callback->cancelLoopCallback(); callback->context_ = RequestContext::saveContext(); if (runOnceCallbacks_ != nullptr && thisIteration) { @@ -497,7 +514,7 @@ void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) { } void EventBase::runInLoop(Func cob, bool thisIteration) { - DCHECK(isInEventBaseThread()); + dcheckIsInEventBaseThread(); auto wrapper = new FunctionLoopCallback(std::move(cob)); wrapper->context_ = RequestContext::saveContext(); if (runOnceCallbacks_ != nullptr && thisIteration) { @@ -514,7 +531,7 @@ void EventBase::runOnDestruction(LoopCallback* callback) { } void EventBase::runBeforeLoop(LoopCallback* callback) { - DCHECK(isInEventBaseThread()); + dcheckIsInEventBaseThread(); callback->cancelLoopCallback(); runBeforeLoopCallbacks_.push_back(*callback); } @@ -697,7 +714,7 @@ void EventBase::detachTimeoutManager(AsyncTimeout* obj) { bool EventBase::scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout) { - assert(isInEventBaseThread()); + dcheckIsInEventBaseThread(); // Set up the timeval and add the event struct timeval tv; tv.tv_sec = long(timeout.count() / 1000LL); @@ -713,7 +730,7 @@ bool EventBase::scheduleTimeout(AsyncTimeout* obj, } void EventBase::cancelTimeout(AsyncTimeout* obj) { - assert(isInEventBaseThread()); + dcheckIsInEventBaseThread(); struct event* ev = obj->getEvent(); if (EventUtil::isEventRegistered(ev)) { event_del(ev); @@ -721,7 +738,7 @@ void EventBase::cancelTimeout(AsyncTimeout* obj) { } void EventBase::setName(const std::string& name) { - assert(isInEventBaseThread()); + dcheckIsInEventBaseThread(); name_ = name; if (isRunning()) { @@ -731,7 +748,7 @@ void EventBase::setName(const std::string& name) { } const std::string& EventBase::getName() { - assert(isInEventBaseThread()); + dcheckIsInEventBaseThread(); return name_; } diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index 81fda805..0ee54f57 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -492,6 +492,18 @@ class EventBase : private boost::noncopyable, std::this_thread::get_id(); } + /** + * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for + * dcheckIsInEventBaseThread), but it prints more information on + * failure. + */ + void checkIsInEventBaseThread() const; + void dcheckIsInEventBaseThread() const { + if (kIsDebug) { + checkIsInEventBaseThread(); + } + } + HHWheelTimer& timer() { if (!wheelTimer_) { wheelTimer_ = HHWheelTimer::newTimer(this); @@ -641,7 +653,7 @@ class EventBase : private boost::noncopyable, protected: void keepAliveRelease() override { - DCHECK(isInEventBaseThread()); + dcheckIsInEventBaseThread(); loopKeepAliveCount_--; } diff --git a/folly/io/async/EventBaseLocal.cpp b/folly/io/async/EventBaseLocal.cpp index 999616fe..cf5cf6f8 100644 --- a/folly/io/async/EventBaseLocal.cpp +++ b/folly/io/async/EventBaseLocal.cpp @@ -31,13 +31,13 @@ EventBaseLocalBase::~EventBaseLocalBase() { } void* EventBaseLocalBase::getVoid(EventBase& evb) { - DCHECK(evb.isInEventBaseThread()); + evb.dcheckIsInEventBaseThread(); return folly::get_default(evb.localStorage_, key_, {}).get(); } void EventBaseLocalBase::erase(EventBase& evb) { - DCHECK(evb.isInEventBaseThread()); + evb.dcheckIsInEventBaseThread(); evb.localStorage_.erase(key_); evb.localStorageToDtor_.erase(this); @@ -48,7 +48,7 @@ void EventBaseLocalBase::erase(EventBase& evb) { } void EventBaseLocalBase::onEventBaseDestruction(EventBase& evb) { - DCHECK(evb.isInEventBaseThread()); + evb.dcheckIsInEventBaseThread(); SYNCHRONIZED(eventBases_) { eventBases_.erase(&evb); @@ -56,7 +56,7 @@ void EventBaseLocalBase::onEventBaseDestruction(EventBase& evb) { } void EventBaseLocalBase::setVoid(EventBase& evb, std::shared_ptr&& ptr) { - DCHECK(evb.isInEventBaseThread()); + evb.dcheckIsInEventBaseThread(); auto alreadyExists = evb.localStorage_.find(key_) != evb.localStorage_.end(); diff --git a/folly/io/async/EventHandler.cpp b/folly/io/async/EventHandler.cpp index af1d17e8..72a37400 100644 --- a/folly/io/async/EventHandler.cpp +++ b/folly/io/async/EventHandler.cpp @@ -110,7 +110,7 @@ void EventHandler::attachEventBase(EventBase* eventBase) { assert(event_.ev_base == nullptr); assert(!isHandlerRegistered()); // This must be invoked from the EventBase's thread - assert(eventBase->isInEventBaseThread()); + eventBase->dcheckIsInEventBaseThread(); setEventBase(eventBase); } diff --git a/folly/io/async/NotificationQueue.h b/folly/io/async/NotificationQueue.h index 3b7d56d9..076139f4 100644 --- a/folly/io/async/NotificationQueue.h +++ b/folly/io/async/NotificationQueue.h @@ -779,7 +779,7 @@ template void NotificationQueue::Consumer::init( EventBase* eventBase, NotificationQueue* queue) { - assert(eventBase->isInEventBaseThread()); + eventBase->dcheckIsInEventBaseThread(); assert(queue_ == nullptr); assert(!isHandlerRegistered()); queue->checkPid(); diff --git a/folly/io/async/VirtualEventBase.h b/folly/io/async/VirtualEventBase.h index d02fd0e5..4e62fb12 100644 --- a/folly/io/async/VirtualEventBase.h +++ b/folly/io/async/VirtualEventBase.h @@ -137,7 +137,7 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager { protected: void keepAliveRelease() override { - DCHECK(getEventBase().isInEventBaseThread()); + getEventBase().dcheckIsInEventBaseThread(); if (loopKeepAliveCountAtomic_.load()) { loopKeepAliveCount_ += loopKeepAliveCountAtomic_.exchange(0); } -- 2.34.1