Fix RequestContext held too long issue in EventBase
[folly.git] / folly / io / async / EventBase.cpp
index d0d58def06e20e4d5411155b9bff918680612c4d..c1ed73365413b866f367d2b4d5504012cb6dfb72 100644 (file)
 #endif
 
 #include <folly/io/async/EventBase.h>
-#include <folly/io/async/VirtualEventBase.h>
-
-#include <folly/Memory.h>
-#include <folly/ThreadName.h>
-#include <folly/io/async/NotificationQueue.h>
-#include <folly/portability/Unistd.h>
 
-#include <condition_variable>
 #include <fcntl.h>
+
+#include <memory>
 #include <mutex>
 #include <thread>
 
+#include <folly/Baton.h>
+#include <folly/Memory.h>
+#include <folly/io/async/NotificationQueue.h>
+#include <folly/io/async/VirtualEventBase.h>
+#include <folly/portability/Unistd.h>
+#include <folly/system/ThreadName.h>
+
 namespace folly {
 
 /*
@@ -94,7 +96,7 @@ EventBase::EventBase(bool enableTimeMeasurement)
     // The value 'current_base' (libevent 1) or
     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
     // allowing examination of its value without an explicit reference here.
-    // If ev.ev_base is NULL, then event_init() must be called, otherwise
+    // If ev.ev_base is nullptr, then event_init() must be called, otherwise
     // call event_base_new().
     event_set(&ev, 0, 0, nullptr, nullptr);
     if (!ev.ev_base) {
@@ -112,7 +114,6 @@ EventBase::EventBase(bool enableTimeMeasurement)
   }
   VLOG(5) << "EventBase(): Created.";
   initNotificationQueue();
-  RequestContext::saveContext();
 }
 
 // takes ownership of the event_base
@@ -138,7 +139,6 @@ EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
   }
   initNotificationQueue();
-  RequestContext::saveContext();
 }
 
 EventBase::~EventBase() {
@@ -198,6 +198,23 @@ void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
   fnRunner_->setMaxReadAtOnce(maxAtOnce);
 }
 
+void EventBase::checkIsInEventBaseThread() const {
+  auto evbTid = loopThread_.load(std::memory_order_relaxed);
+  if (evbTid == std::thread::id()) {
+    return;
+  }
+
+  // Using getThreadName(evbTid) instead of name_ will work also if
+  // the thread name is set outside of EventBase (and name_ is empty).
+  auto curTid = std::this_thread::get_id();
+  CHECK(evbTid == curTid)
+      << "This logic must be executed in the event base thread. "
+      << "Event base thread name: \""
+      << folly::getThreadName(evbTid).value_or("")
+      << "\", current thread name: \""
+      << folly::getThreadName(curTid).value_or("") << "\"";
+}
+
 // Set smoothing coefficient for loop load average; input is # of milliseconds
 // for exp(-1) decay.
 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
@@ -486,7 +503,7 @@ void EventBase::terminateLoopSoon() {
 }
 
 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
-  DCHECK(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   callback->cancelLoopCallback();
   callback->context_ = RequestContext::saveContext();
   if (runOnceCallbacks_ != nullptr && thisIteration) {
@@ -497,7 +514,7 @@ void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
 }
 
 void EventBase::runInLoop(Func cob, bool thisIteration) {
-  DCHECK(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   auto wrapper = new FunctionLoopCallback(std::move(cob));
   wrapper->context_ = RequestContext::saveContext();
   if (runOnceCallbacks_ != nullptr && thisIteration) {
@@ -514,7 +531,7 @@ void EventBase::runOnDestruction(LoopCallback* callback) {
 }
 
 void EventBase::runBeforeLoop(LoopCallback* callback) {
-  DCHECK(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   callback->cancelLoopCallback();
   runBeforeLoopCallbacks_.push_back(*callback);
 }
@@ -534,7 +551,6 @@ bool EventBase::runInEventBaseThread(Func fn) {
   if (inRunningEventBaseThread()) {
     runInLoop(std::move(fn));
     return true;
-
   }
 
   try {
@@ -548,34 +564,28 @@ bool EventBase::runInEventBaseThread(Func fn) {
   return true;
 }
 
-bool EventBase::runInEventBaseThreadAndWait(FuncRef fn) {
+bool EventBase::runInEventBaseThreadAndWait(Func fn) {
   if (inRunningEventBaseThread()) {
     LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
                << "allowed";
     return false;
   }
 
-  bool ready = false;
-  std::mutex m;
-  std::condition_variable cv;
-  runInEventBaseThread([&] {
-      SCOPE_EXIT {
-        std::unique_lock<std::mutex> l(m);
-        ready = true;
-        cv.notify_one();
-        // We cannot release the lock before notify_one, because a spurious
-        // wakeup in the waiting thread may lead to cv and m going out of scope
-        // prematurely.
-      };
-      fn();
+  Baton<> ready;
+  runInEventBaseThread([&ready, fn = std::move(fn)]() mutable {
+    SCOPE_EXIT {
+      ready.post();
+    };
+    // A trick to force the stored functor to be executed and then destructed
+    // before posting the baton and waking the waiting thread.
+    copy(std::move(fn))();
   });
-  std::unique_lock<std::mutex> l(m);
-  cv.wait(l, [&] { return ready; });
+  ready.wait();
 
   return true;
 }
 
-bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn) {
+bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) {
   if (isInEventBaseThread()) {
     fn();
     return true;
@@ -602,7 +612,7 @@ bool EventBase::runLoopCallbacks() {
     while (!currentCallbacks.empty()) {
       LoopCallback* callback = &currentCallbacks.front();
       currentCallbacks.pop_front();
-      folly::RequestContextScopeGuard rctx(callback->context_);
+      folly::RequestContextScopeGuard rctx(std::move(callback->context_));
       callback->runLoopCallback();
     }
 
@@ -614,12 +624,12 @@ bool EventBase::runLoopCallbacks() {
 
 void EventBase::initNotificationQueue() {
   // Infinite size queue
-  queue_.reset(new NotificationQueue<Func>());
+  queue_ = std::make_unique<NotificationQueue<Func>>();
 
   // We allocate fnRunner_ separately, rather than declaring it directly
   // as a member of EventBase solely so that we don't need to include
   // NotificationQueue.h from EventBase.h
-  fnRunner_.reset(new FunctionRunner());
+  fnRunner_ = std::make_unique<FunctionRunner>();
 
   // Mark this as an internal event, so event_base_loop() will return if
   // there are no other events besides this one installed.
@@ -697,7 +707,7 @@ void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
 
 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
                                  TimeoutManager::timeout_type timeout) {
-  assert(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   // Set up the timeval and add the event
   struct timeval tv;
   tv.tv_sec = long(timeout.count() / 1000LL);
@@ -713,7 +723,7 @@ bool EventBase::scheduleTimeout(AsyncTimeout* obj,
 }
 
 void EventBase::cancelTimeout(AsyncTimeout* obj) {
-  assert(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   struct event* ev = obj->getEvent();
   if (EventUtil::isEventRegistered(ev)) {
     event_del(ev);
@@ -721,7 +731,7 @@ void EventBase::cancelTimeout(AsyncTimeout* obj) {
 }
 
 void EventBase::setName(const std::string& name) {
-  assert(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   name_ = name;
 
   if (isRunning()) {
@@ -731,7 +741,7 @@ void EventBase::setName(const std::string& name) {
 }
 
 const std::string& EventBase::getName() {
-  assert(isInEventBaseThread());
+  dcheckIsInEventBaseThread();
   return name_;
 }
 
@@ -740,10 +750,10 @@ const char* EventBase::getLibeventMethod() { return event_get_method(); }
 
 VirtualEventBase& EventBase::getVirtualEventBase() {
   folly::call_once(virtualEventBaseInitFlag_, [&] {
-    virtualEventBase_ = folly::make_unique<VirtualEventBase>(*this);
+    virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
   });
 
   return *virtualEventBase_;
 }
 
-} // folly
+} // namespace folly