Print expected/actual thread names when running EventBase logic in unexpected thread
authorGiuseppe Ottaviano <ott@fb.com>
Wed, 21 Jun 2017 19:28:52 +0000 (12:28 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Wed, 21 Jun 2017 19:35:32 +0000 (12:35 -0700)
Summary: The existing assertion errors do not give a lot of information to track down which thread is incorrectly using the `EventBase`. Print both the current thread name and the name of the thread running the event base loop.

Reviewed By: luciang

Differential Revision: D5289790

fbshipit-source-id: 563c7f68b7f9b7a6e85e22290d7e81afbf89871e

folly/io/async/AsyncSSLSocket.cpp
folly/io/async/AsyncServerSocket.cpp
folly/io/async/AsyncServerSocket.h
folly/io/async/AsyncSocket.cpp
folly/io/async/AsyncUDPSocket.cpp
folly/io/async/EventBase.cpp
folly/io/async/EventBase.h
folly/io/async/EventBaseLocal.cpp
folly/io/async/EventHandler.cpp
folly/io/async/NotificationQueue.h
folly/io/async/VirtualEventBase.h

index 84354730cc7b3a9c09f490e59ff6a8ae972497e0..5c9ee683205b2d75da3d295fd505ce56588931d6 100644 (file)
@@ -450,7 +450,7 @@ void AsyncSSLSocket::sslAccept(
     std::chrono::milliseconds timeout,
     const SSLContext::SSLVerifyPeerEnum& verifyPeer) {
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
   verifyPeer_ = verifyPeer;
 
   // Make sure we're in the uninitialized state
@@ -749,7 +749,7 @@ void AsyncSSLSocket::sslConn(
     std::chrono::milliseconds timeout,
     const SSLContext::SSLVerifyPeerEnum& verifyPeer) {
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   // Cache local and remote socket addresses to keep them available
   // after socket file descriptor is closed.
index 7e02dde7e19ae5d6e796a65fe16bdce698398c50..985f3d3fefea0a230bb2d1cc3b12c85c8864c6e0 100644 (file)
@@ -182,7 +182,9 @@ int AsyncServerSocket::stopAccepting(int shutdownFlags) {
     VLOG(10) << "AsyncServerSocket::stopAccepting " << this <<
               handler.socket_;
   }
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   // When destroy is called, unregister and close the socket immediately.
   accepting_ = false;
@@ -244,7 +246,7 @@ void AsyncServerSocket::destroy() {
 
 void AsyncServerSocket::attachEventBase(EventBase *eventBase) {
   assert(eventBase_ == nullptr);
-  assert(eventBase->isInEventBaseThread());
+  eventBase->dcheckIsInEventBaseThread();
 
   eventBase_ = eventBase;
   for (auto& handler : sockets_) {
@@ -254,7 +256,7 @@ void AsyncServerSocket::attachEventBase(EventBase *eventBase) {
 
 void AsyncServerSocket::detachEventBase() {
   assert(eventBase_ != nullptr);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
   assert(!accepting_);
 
   eventBase_ = nullptr;
@@ -264,7 +266,9 @@ void AsyncServerSocket::detachEventBase() {
 }
 
 void AsyncServerSocket::useExistingSockets(const std::vector<int>& fds) {
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   if (sockets_.size() > 0) {
     throw std::invalid_argument(
@@ -328,7 +332,9 @@ void AsyncServerSocket::bindSocket(
 }
 
 void AsyncServerSocket::bind(const SocketAddress& address) {
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   // useExistingSocket() may have been called to initialize socket_ already.
   // However, in the normal case we need to create a new socket now.
@@ -505,7 +511,9 @@ void AsyncServerSocket::bind(uint16_t port) {
 }
 
 void AsyncServerSocket::listen(int backlog) {
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   // Start listening
   for (auto& handler : sockets_) {
@@ -539,7 +547,9 @@ std::vector<SocketAddress> AsyncServerSocket::getAddresses()
 void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback,
                                            EventBase *eventBase,
                                            uint32_t maxAtOnce) {
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   // If this is the first accept callback and we are supposed to be accepting,
   // start accepting once the callback is installed.
@@ -585,7 +595,9 @@ void AsyncServerSocket::addAcceptCallback(AcceptCallback *callback,
 
 void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback,
                                               EventBase *eventBase) {
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   // Find the matching AcceptCallback.
   // We just do a simple linear search; we don't expect removeAcceptCallback()
@@ -648,7 +660,9 @@ void AsyncServerSocket::removeAcceptCallback(AcceptCallback *callback,
 }
 
 void AsyncServerSocket::startAccepting() {
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   accepting_ = true;
   if (callbacks_.empty()) {
@@ -666,7 +680,9 @@ void AsyncServerSocket::startAccepting() {
 }
 
 void AsyncServerSocket::pauseAccepting() {
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
   accepting_ = false;
   for (auto& handler : sockets_) {
    handler. unregisterHandler();
@@ -1023,7 +1039,8 @@ void AsyncServerSocket::backoffTimeoutExpired() {
   // the backoff timeout.
   assert(accepting_);
   // We can't be detached from the EventBase without being paused
-  assert(eventBase_ != nullptr && eventBase_->isInEventBaseThread());
+  assert(eventBase_ != nullptr);
+  eventBase_->dcheckIsInEventBaseThread();
 
   // If all of the callbacks were removed, we shouldn't re-enable accepts
   if (callbacks_.empty()) {
index ff2f539c20f455b2de01cc35e74ee1896221c4eb..1d53bccc784b7482558f837260427b7bae456bbf 100644 (file)
@@ -588,7 +588,9 @@ class AsyncServerSocket : public DelayedDestruction
    * socket's primary EventBase.
    */
   int64_t getNumPendingMessagesInQueue() const {
-    assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+    if (eventBase_) {
+      eventBase_->dcheckIsInEventBaseThread();
+    }
     int64_t numMsgs = 0;
     for (const auto& callback : callbacks_) {
       numMsgs += callback.consumer->getQueue()->size();
index 46cc0d3b811be983c429a870f05d6b12099127bd..6cd879d9a567843cd556f40b3d8a9009c638535d 100644 (file)
@@ -266,7 +266,9 @@ AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
 // init() method, since constructor forwarding isn't supported in most
 // compilers yet.
 void AsyncSocket::init() {
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
   shutdownFlags_ = 0;
   state_ = StateEnum::UNINIT;
   eventFlags_ = EventHandler::NONE;
@@ -356,7 +358,7 @@ void AsyncSocket::connect(ConnectCallback* callback,
                            const OptionMap &options,
                            const folly::SocketAddress& bindAddr) noexcept {
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   addr_ = address;
 
@@ -592,7 +594,9 @@ void AsyncSocket::cancelConnect() {
 
 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
   sendTimeout_ = milliseconds;
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   // If we are currently pending on write requests, immediately update
   // writeTimeout_ with the new value.
@@ -628,7 +632,7 @@ void AsyncSocket::setErrMessageCB(ErrMessageCallback* callback) {
   }
 
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   if (callback == nullptr) {
     // We should be able to reset the callback regardless of the
@@ -714,7 +718,7 @@ void AsyncSocket::setReadCB(ReadCallback *callback) {
   }
 
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   switch ((StateEnum)state_) {
     case StateEnum::CONNECTING:
@@ -816,7 +820,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
           << ", state=" << state_;
   DestructorGuard dg(this);
   unique_ptr<IOBuf>ioBuf(std::move(buf));
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
     // No new writes may be performed after the write side of the socket has
@@ -966,7 +970,7 @@ void AsyncSocket::close() {
   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
   // destroyed until close() returns.
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   // Since there are write requests pending, we have to set the
   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
@@ -998,7 +1002,9 @@ void AsyncSocket::closeNow() {
           << ", state=" << state_ << ", shutdownFlags="
           << std::hex << (int) shutdownFlags_;
   DestructorGuard dg(this);
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   switch (state_) {
     case StateEnum::ESTABLISHED:
@@ -1090,7 +1096,7 @@ void AsyncSocket::shutdownWrite() {
     return;
   }
 
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
   // shutdown will be performed once all writes complete.
@@ -1117,7 +1123,9 @@ void AsyncSocket::shutdownWriteNow() {
   }
 
   DestructorGuard dg(this);
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   switch (static_cast<StateEnum>(state_)) {
     case StateEnum::ESTABLISHED:
@@ -1245,7 +1253,7 @@ void AsyncSocket::attachEventBase(EventBase* eventBase) {
           << ", state=" << state_ << ", events="
           << std::hex << eventFlags_ << ")";
   assert(eventBase_ == nullptr);
-  assert(eventBase->isInEventBaseThread());
+  eventBase->dcheckIsInEventBaseThread();
 
   eventBase_ = eventBase;
   ioHandler_.attachEventBase(eventBase);
@@ -1260,7 +1268,7 @@ void AsyncSocket::detachEventBase() {
           << ", old evb=" << eventBase_ << ", state=" << state_
           << ", events=" << std::hex << eventFlags_ << ")";
   assert(eventBase_ != nullptr);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   eventBase_ = nullptr;
   ioHandler_.detachEventBase();
@@ -1272,7 +1280,7 @@ void AsyncSocket::detachEventBase() {
 
 bool AsyncSocket::isDetachable() const {
   DCHECK(eventBase_ != nullptr);
-  DCHECK(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
 }
@@ -1428,7 +1436,7 @@ void AsyncSocket::ioReady(uint16_t events) noexcept {
           << ", events=" << std::hex << events << ", state=" << state_;
   DestructorGuard dg(this);
   assert(events & EventHandler::READ_WRITE);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   uint16_t relevantEvents = uint16_t(events & EventHandler::READ_WRITE);
   EventBase* originalEventBase = eventBase_;
@@ -1973,7 +1981,7 @@ void AsyncSocket::timeoutExpired() noexcept {
   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   if (state_ == StateEnum::CONNECTING) {
     // connect() timed out
@@ -2151,13 +2159,13 @@ AsyncSocket::WriteResult AsyncSocket::performWrite(
  * and call all currently installed callbacks.  After an error, the
  * AsyncSocket is completely unregistered.
  *
- * @return Returns true on succcess, or false on error.
+ * @return Returns true on success, or false on error.
  */
 bool AsyncSocket::updateEventRegistration() {
   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
           << ", events=" << std::hex << eventFlags_;
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
   if (eventFlags_ == EventHandler::NONE) {
     ioHandler_.unregisterHandler();
     return true;
index 5d99e44465aa0584e1223737a30005f94abaebed..43c79c21363e0a45998ccf3a4705b0ab3ea7231a 100644 (file)
@@ -39,7 +39,7 @@ AsyncUDPSocket::AsyncUDPSocket(EventBase* evb)
       eventBase_(evb),
       fd_(-1),
       readCallback_(nullptr) {
-  DCHECK(evb->isInEventBaseThread());
+  evb->dcheckIsInEventBaseThread();
 }
 
 AsyncUDPSocket::~AsyncUDPSocket() {
@@ -201,7 +201,7 @@ void AsyncUDPSocket::pauseRead() {
 }
 
 void AsyncUDPSocket::close() {
-  DCHECK(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   if (readCallback_) {
     auto cob = readCallback_;
index cf8a87fa84b5072db3ef48e1eda081b39402c0fd..6d4a9684eaf47727e77a476d791e34b4ada93d73 100644 (file)
@@ -198,6 +198,23 @@ void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
   fnRunner_->setMaxReadAtOnce(maxAtOnce);
 }
 
+void EventBase::checkIsInEventBaseThread() const {
+  auto evbTid = loopThread_.load(std::memory_order_relaxed);
+  if (evbTid == std::thread::id()) {
+    return;
+  }
+
+  // Using getThreadName(evbTid) instead of name_ will work also if
+  // the thread name is set outside of EventBase (and name_ is empty).
+  auto curTid = std::this_thread::get_id();
+  CHECK(evbTid == curTid)
+      << "This logic must be executed in the event base thread. "
+      << "Event base thread name: \""
+      << folly::getThreadName(evbTid).value_or("")
+      << "\", current thread name: \""
+      << folly::getThreadName(curTid).value_or("") << "\"";
+}
+
 // Set smoothing coefficient for loop load average; input is # of milliseconds
 // for exp(-1) decay.
 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
@@ -486,7 +503,7 @@ void EventBase::terminateLoopSoon() {
 }
 
 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
-  DCHECK(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   callback->cancelLoopCallback();
   callback->context_ = RequestContext::saveContext();
   if (runOnceCallbacks_ != nullptr && thisIteration) {
@@ -497,7 +514,7 @@ void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
 }
 
 void EventBase::runInLoop(Func cob, bool thisIteration) {
-  DCHECK(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   auto wrapper = new FunctionLoopCallback(std::move(cob));
   wrapper->context_ = RequestContext::saveContext();
   if (runOnceCallbacks_ != nullptr && thisIteration) {
@@ -514,7 +531,7 @@ void EventBase::runOnDestruction(LoopCallback* callback) {
 }
 
 void EventBase::runBeforeLoop(LoopCallback* callback) {
-  DCHECK(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   callback->cancelLoopCallback();
   runBeforeLoopCallbacks_.push_back(*callback);
 }
@@ -697,7 +714,7 @@ void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
 
 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
                                  TimeoutManager::timeout_type timeout) {
-  assert(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   // Set up the timeval and add the event
   struct timeval tv;
   tv.tv_sec = long(timeout.count() / 1000LL);
@@ -713,7 +730,7 @@ bool EventBase::scheduleTimeout(AsyncTimeout* obj,
 }
 
 void EventBase::cancelTimeout(AsyncTimeout* obj) {
-  assert(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   struct event* ev = obj->getEvent();
   if (EventUtil::isEventRegistered(ev)) {
     event_del(ev);
@@ -721,7 +738,7 @@ void EventBase::cancelTimeout(AsyncTimeout* obj) {
 }
 
 void EventBase::setName(const std::string& name) {
-  assert(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   name_ = name;
 
   if (isRunning()) {
@@ -731,7 +748,7 @@ void EventBase::setName(const std::string& name) {
 }
 
 const std::string& EventBase::getName() {
-  assert(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   return name_;
 }
 
index 81fda805ead4578d2bf3e1160c3ded0f26c6352f..0ee54f57c812114f3c27ad2592fbe297cd28cd26 100644 (file)
@@ -492,6 +492,18 @@ class EventBase : private boost::noncopyable,
         std::this_thread::get_id();
   }
 
+  /**
+   * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for
+   * dcheckIsInEventBaseThread), but it prints more information on
+   * failure.
+   */
+  void checkIsInEventBaseThread() const;
+  void dcheckIsInEventBaseThread() const {
+    if (kIsDebug) {
+      checkIsInEventBaseThread();
+    }
+  }
+
   HHWheelTimer& timer() {
     if (!wheelTimer_) {
       wheelTimer_ = HHWheelTimer::newTimer(this);
@@ -641,7 +653,7 @@ class EventBase : private boost::noncopyable,
 
  protected:
   void keepAliveRelease() override {
-    DCHECK(isInEventBaseThread());
+    dcheckIsInEventBaseThread();
     loopKeepAliveCount_--;
   }
 
index 999616fef25acde9080cae8677563a82729d60bd..cf5cf6f86e123f0f23eb399a812a5f177e152a09 100644 (file)
@@ -31,13 +31,13 @@ EventBaseLocalBase::~EventBaseLocalBase() {
 }
 
 void* EventBaseLocalBase::getVoid(EventBase& evb) {
-  DCHECK(evb.isInEventBaseThread());
+  evb.dcheckIsInEventBaseThread();
 
   return folly::get_default(evb.localStorage_, key_, {}).get();
 }
 
 void EventBaseLocalBase::erase(EventBase& evb) {
-  DCHECK(evb.isInEventBaseThread());
+  evb.dcheckIsInEventBaseThread();
 
   evb.localStorage_.erase(key_);
   evb.localStorageToDtor_.erase(this);
@@ -48,7 +48,7 @@ void EventBaseLocalBase::erase(EventBase& evb) {
 }
 
 void EventBaseLocalBase::onEventBaseDestruction(EventBase& evb) {
-  DCHECK(evb.isInEventBaseThread());
+  evb.dcheckIsInEventBaseThread();
 
   SYNCHRONIZED(eventBases_) {
     eventBases_.erase(&evb);
@@ -56,7 +56,7 @@ void EventBaseLocalBase::onEventBaseDestruction(EventBase& evb) {
 }
 
 void EventBaseLocalBase::setVoid(EventBase& evb, std::shared_ptr<void>&& ptr) {
-  DCHECK(evb.isInEventBaseThread());
+  evb.dcheckIsInEventBaseThread();
 
   auto alreadyExists =
     evb.localStorage_.find(key_) != evb.localStorage_.end();
index af1d17e88fbb3af70eb3ff0e1d6428ff366fac8c..72a374002a630b75241b8a07cfcd02d2c7666589 100644 (file)
@@ -110,7 +110,7 @@ void EventHandler::attachEventBase(EventBase* eventBase) {
   assert(event_.ev_base == nullptr);
   assert(!isHandlerRegistered());
   // This must be invoked from the EventBase's thread
-  assert(eventBase->isInEventBaseThread());
+  eventBase->dcheckIsInEventBaseThread();
 
   setEventBase(eventBase);
 }
index 3b7d56d907c33c25ef8ae1e2d939d6c33a27f705..076139f449e4996c07eca71d712d9eb0dbd86c06 100644 (file)
@@ -779,7 +779,7 @@ template<typename MessageT>
 void NotificationQueue<MessageT>::Consumer::init(
     EventBase* eventBase,
     NotificationQueue* queue) {
-  assert(eventBase->isInEventBaseThread());
+  eventBase->dcheckIsInEventBaseThread();
   assert(queue_ == nullptr);
   assert(!isHandlerRegistered());
   queue->checkPid();
index d02fd0e59ea1e16f8bb0d1599c71290e38c29ee5..4e62fb121ba649cef20ca990a06c2909d0c7746b 100644 (file)
@@ -137,7 +137,7 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
 
  protected:
   void keepAliveRelease() override {
-    DCHECK(getEventBase().isInEventBaseThread());
+    getEventBase().dcheckIsInEventBaseThread();
     if (loopKeepAliveCountAtomic_.load()) {
       loopKeepAliveCount_ += loopKeepAliveCountAtomic_.exchange(0);
     }