Fix RequestContext held too long issue in EventBase
[folly.git] / folly / io / async / EventBase.h
index 20d282b5f62642d71048d24f24b453dc7e6a1401..3399cea2dacbdcc57af56f3318efd1837b65583e 100644 (file)
 #pragma once
 
 #include <atomic>
+#include <cerrno>
+#include <cmath>
 #include <cstdlib>
-#include <errno.h>
 #include <functional>
 #include <list>
-#include <math.h>
 #include <memory>
 #include <mutex>
 #include <queue>
 
 #include <boost/intrusive/list.hpp>
 #include <boost/utility.hpp>
+#include <glog/logging.h>
 
 #include <folly/Executor.h>
 #include <folly/Function.h>
 #include <folly/Portability.h>
 #include <folly/ScopeGuard.h>
+#include <folly/executors/DrivableExecutor.h>
 #include <folly/experimental/ExecutionObserver.h>
-#include <folly/futures/DrivableExecutor.h>
 #include <folly/io/async/AsyncTimeout.h>
 #include <folly/io/async/HHWheelTimer.h>
 #include <folly/io/async/Request.h>
 #include <folly/io/async/TimeoutManager.h>
-#include <folly/portability/PThread.h>
-#include <glog/logging.h>
-
-#include <event.h>  // libevent
+#include <folly/portability/Event.h>
+#include <folly/synchronization/CallOnce.h>
 
 namespace folly {
 
@@ -63,7 +62,7 @@ class EventBaseLocalBaseBase {
   virtual void onEventBaseDestruction(EventBase& evb) = 0;
   virtual ~EventBaseLocalBaseBase() = default;
 };
-}
+} // namespace detail
 template <typename T>
 class EventBaseLocal;
 
@@ -96,6 +95,10 @@ class RequestEventBase : public RequestData {
         std::unique_ptr<RequestEventBase>(new RequestEventBase(eb)));
   }
 
+  bool hasCallback() override {
+    return false;
+  }
+
  private:
   explicit RequestEventBase(EventBase* eb) : eb_(eb) {}
   EventBase* eb_;
@@ -127,7 +130,6 @@ class EventBase : private boost::noncopyable,
                   public DrivableExecutor {
  public:
   using Func = folly::Function<void()>;
-  using FuncRef = folly::FunctionRef<void()>;
 
   /**
    * A callback interface to use with runInLoop()
@@ -148,6 +150,7 @@ class EventBase : private boost::noncopyable,
 
     virtual void runLoopCallback() noexcept = 0;
     void cancelLoopCallback() {
+      context_.reset();
       unlink();
     }
 
@@ -221,7 +224,7 @@ class EventBase : private boost::noncopyable,
    *                              observer, max latency and avg loop time.
    */
   explicit EventBase(event_base* evb, bool enableTimeMeasurement = true);
-  ~EventBase();
+  ~EventBase() override;
 
   /**
    * Runs the event loop.
@@ -417,7 +420,7 @@ class EventBase : private boost::noncopyable,
    * Like runInEventBaseThread, but the caller waits for the callback to be
    * executed.
    */
-  bool runInEventBaseThreadAndWait(FuncRef fn);
+  bool runInEventBaseThreadAndWait(Func fn);
 
   /*
    * Like runInEventBaseThreadAndWait, except if the caller is already in the
@@ -430,7 +433,7 @@ class EventBase : private boost::noncopyable,
    * Like runInEventBaseThreadAndWait, except if the caller is already in the
    * event base thread, the functor is simply run inline.
    */
-  bool runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn);
+  bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
 
   /**
    * Set the maximum desired latency in us and provide a callback which will be
@@ -466,7 +469,7 @@ class EventBase : private boost::noncopyable,
     * check if the event base loop is running.
    */
   bool isRunning() const {
-    return loopThread_.load(std::memory_order_relaxed) != 0;
+    return loopThread_.load(std::memory_order_relaxed) != std::thread::id();
   }
 
   /**
@@ -484,12 +487,24 @@ class EventBase : private boost::noncopyable,
    */
   bool isInEventBaseThread() const {
     auto tid = loopThread_.load(std::memory_order_relaxed);
-    return tid == 0 || pthread_equal(tid, pthread_self());
+    return tid == std::thread::id() || tid == std::this_thread::get_id();
   }
 
   bool inRunningEventBaseThread() const {
-    return pthread_equal(
-      loopThread_.load(std::memory_order_relaxed), pthread_self());
+    return loopThread_.load(std::memory_order_relaxed) ==
+        std::this_thread::get_id();
+  }
+
+  /**
+   * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for
+   * dcheckIsInEventBaseThread), but it prints more information on
+   * failure.
+   */
+  void checkIsInEventBaseThread() const;
+  void dcheckIsInEventBaseThread() const {
+    if (kIsDebug) {
+      checkIsInEventBaseThread();
+    }
   }
 
   HHWheelTimer& timer() {
@@ -515,7 +530,7 @@ class EventBase : private boost::noncopyable,
    * first handler fired within that cycle.
    *
    */
-  void bumpHandlingTime() override final;
+  void bumpHandlingTime() final;
 
   class SmoothLoopTime {
    public:
@@ -592,8 +607,6 @@ class EventBase : private boost::noncopyable,
 
   /// Implements the DrivableExecutor interface
   void drive() override {
-    // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
-    // released inside a loop.
     ++loopKeepAliveCount_;
     SCOPE_EXIT {
       --loopKeepAliveCount_;
@@ -601,51 +614,55 @@ class EventBase : private boost::noncopyable,
     loopOnce();
   }
 
-  struct LoopKeepAliveDeleter {
-    void operator()(EventBase* evb) {
-      DCHECK(evb->isInEventBaseThread());
-      evb->loopKeepAliveCount_--;
-    }
-  };
-  using LoopKeepAlive = std::unique_ptr<EventBase, LoopKeepAliveDeleter>;
-
   /// Returns you a handle which make loop() behave like loopForever() until
   /// destroyed. loop() will return to its original behavior only when all
-  /// loop keep-alives are released. Loop holder is safe to release only from
-  /// EventBase thread.
-  ///
-  /// May return no op LoopKeepAlive if loopForever() is already running.
-  LoopKeepAlive loopKeepAlive() {
-    DCHECK(isInEventBaseThread());
-    loopKeepAliveCount_++;
-    return LoopKeepAlive(this);
-  }
-
-  // Thread-safe version of loopKeepAlive()
-  LoopKeepAlive loopKeepAliveAtomic() {
-    if (inRunningEventBaseThread()) {
-      return loopKeepAlive();
-    }
-    loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
-    return LoopKeepAlive(this);
+  /// loop keep-alives are released.
+  KeepAlive getKeepAliveToken() override {
+    keepAliveAcquire();
+    return makeKeepAlive();
   }
 
   // TimeoutManager
   void attachTimeoutManager(
       AsyncTimeout* obj,
-      TimeoutManager::InternalEnum internal) override final;
+      TimeoutManager::InternalEnum internal) final;
 
-  void detachTimeoutManager(AsyncTimeout* obj) override final;
+  void detachTimeoutManager(AsyncTimeout* obj) final;
 
   bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout)
-      override final;
+      final;
 
-  void cancelTimeout(AsyncTimeout* obj) override final;
+  void cancelTimeout(AsyncTimeout* obj) final;
 
-  bool isInTimeoutManagerThread() override final {
+  bool isInTimeoutManagerThread() final {
     return isInEventBaseThread();
   }
 
+  // Returns a VirtualEventBase attached to this EventBase. Can be used to
+  // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be
+  // destroyed together with the EventBase.
+  //
+  // Any number of VirtualEventBases instances may be independently constructed,
+  // which are backed by this EventBase. This method should be only used if you
+  // don't need to manage the life time of the VirtualEventBase used.
+  folly::VirtualEventBase& getVirtualEventBase();
+
+ protected:
+  void keepAliveAcquire() override {
+    if (inRunningEventBaseThread()) {
+      loopKeepAliveCount_++;
+    } else {
+      loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
+    }
+  }
+
+  void keepAliveRelease() override {
+    if (!inRunningEventBaseThread()) {
+      return add([=] { keepAliveRelease(); });
+    }
+    loopKeepAliveCount_--;
+  }
+
  private:
   void applyLoopKeepAlive();
 
@@ -685,11 +702,8 @@ class EventBase : private boost::noncopyable,
   std::atomic<bool> stop_;
 
   // The ID of the thread running the main loop.
-  // 0 if loop is not running.
-  // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or
-  // even that atomic<pthread_t> is valid), but that's how it is
-  // everywhere (at least on Linux, FreeBSD, and OSX).
-  std::atomic<pthread_t> loopThread_;
+  // std::thread::id{} if loop is not running.
+  std::atomic<std::thread::id> loopThread_;
 
   // pointer to underlying event_base class doing the heavy lifting
   event_base* evb_;
@@ -724,7 +738,7 @@ class EventBase : private boost::noncopyable,
   // Wrap-around loop counter to detect beginning of each loop
   uint64_t nextLoopCnt_;
   uint64_t latestLoopCnt_;
-  uint64_t startWork_;
+  std::chrono::steady_clock::time_point startWork_;
   // Prevent undefined behavior from invoking event_base_loop() reentrantly.
   // This is needed since many projects use libevent-1.4, which lacks commit
   // b557b175c00dc462c1fce25f6e7dd67121d2c001 from
@@ -747,9 +761,11 @@ class EventBase : private boost::noncopyable,
   // see EventBaseLocal
   friend class detail::EventBaseLocalBase;
   template <typename T> friend class EventBaseLocal;
-  std::mutex localStorageMutex_;
   std::unordered_map<uint64_t, std::shared_ptr<void>> localStorage_;
   std::unordered_set<detail::EventBaseLocalBaseBase*> localStorageToDtor_;
+
+  folly::once_flag virtualEventBaseInitFlag_;
+  std::unique_ptr<VirtualEventBase> virtualEventBase_;
 };
 
 template <typename T>
@@ -769,4 +785,4 @@ bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(
   return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
 }
 
-} // folly
+} // namespace folly