Add keepAlive() mechanism
[folly.git] / folly / io / async / EventBase.h
index 89e89fd1b1765bf0d2b77bf98641a47d3dd46941..c318d609ef6c3ada5e04865cf7724ece4a168df7 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include <folly/Executor.h>
 #include <folly/Function.h>
 #include <folly/Portability.h>
+#include <folly/ScopeGuard.h>
 #include <folly/experimental/ExecutionObserver.h>
 #include <folly/futures/DrivableExecutor.h>
 #include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/HHWheelTimer.h>
 #include <folly/io/async/Request.h>
 #include <folly/io/async/TimeoutManager.h>
-#include <folly/portability/PThread.h>
 #include <glog/logging.h>
 
 #include <event.h>  // libevent
 
 namespace folly {
 
-typedef std::function<void()> Cob;
+using Cob = Func; // defined in folly/Executor.h
 template <typename MessageT>
 class NotificationQueue;
 
@@ -100,6 +101,8 @@ class RequestEventBase : public RequestData {
   static constexpr const char* kContextDataName{"EventBase"};
 };
 
+class VirtualEventBase;
+
 /**
  * This class is a wrapper for all asynchronous I/O processing functionality
  *
@@ -123,6 +126,7 @@ class EventBase : private boost::noncopyable,
                   public DrivableExecutor {
  public:
   using Func = folly::Function<void()>;
+  using FuncRef = folly::FunctionRef<void()>;
 
   /**
    * A callback interface to use with runInLoop()
@@ -135,36 +139,62 @@ class EventBase : private boost::noncopyable,
    * If a LoopCallback object is destroyed while it is scheduled to be run in
    * the next loop iteration, it will automatically be cancelled.
    */
-  class LoopCallback {
+  class LoopCallback
+      : public boost::intrusive::list_base_hook<
+            boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
    public:
     virtual ~LoopCallback() = default;
 
     virtual void runLoopCallback() noexcept = 0;
     void cancelLoopCallback() {
-      hook_.unlink();
+      unlink();
     }
 
     bool isLoopCallbackScheduled() const {
-      return hook_.is_linked();
+      return is_linked();
     }
 
    private:
-    typedef boost::intrusive::list_member_hook<
-      boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
-
-    ListHook hook_;
-
     typedef boost::intrusive::list<
       LoopCallback,
-      boost::intrusive::member_hook<LoopCallback, ListHook,
-                                    &LoopCallback::hook_>,
       boost::intrusive::constant_time_size<false> > List;
 
     // EventBase needs access to LoopCallbackList (and therefore to hook_)
     friend class EventBase;
+    friend class VirtualEventBase;
     std::shared_ptr<RequestContext> context_;
   };
 
+  class FunctionLoopCallback : public LoopCallback {
+   public:
+    explicit FunctionLoopCallback(Func&& function)
+        : function_(std::move(function)) {}
+
+    void runLoopCallback() noexcept override {
+      function_();
+      delete this;
+    }
+
+   private:
+    Func function_;
+  };
+
+  // Like FunctionLoopCallback, but saves one allocation. Use with caution.
+  //
+  // The caller is responsible for maintaining the lifetime of this callback
+  // until after the point at which the contained function is called.
+  class StackFunctionLoopCallback : public LoopCallback {
+   public:
+    explicit StackFunctionLoopCallback(Func&& function)
+        : function_(std::move(function)) {}
+    void runLoopCallback() noexcept override {
+      Func(std::move(function_))();
+    }
+
+   private:
+    Func function_;
+  };
+
   /**
    * Create a new EventBase object.
    *
@@ -322,15 +352,6 @@ class EventBase : private boost::noncopyable,
    */
   void runOnDestruction(LoopCallback* callback);
 
-  /**
-   * Adds the given callback to a queue of things run after the notification
-   * queue is drained before the destruction of current EventBase.
-   *
-   * Note: will be called from the thread that invoked EventBase destructor,
-   *       after the final run of loop callbacks.
-   */
-  void runAfterDrain(Func cob);
-
   /**
    * Adds a callback that will run immediately *before* the event loop.
    * This is very similar to runInLoop(), but will not cause the loop to break:
@@ -395,7 +416,7 @@ class EventBase : private boost::noncopyable,
    * Like runInEventBaseThread, but the caller waits for the callback to be
    * executed.
    */
-  bool runInEventBaseThreadAndWait(Func fn);
+  bool runInEventBaseThreadAndWait(FuncRef fn);
 
   /*
    * Like runInEventBaseThreadAndWait, except if the caller is already in the
@@ -408,47 +429,24 @@ class EventBase : private boost::noncopyable,
    * Like runInEventBaseThreadAndWait, except if the caller is already in the
    * event base thread, the functor is simply run inline.
    */
-  bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
-
-  /**
-   * Runs the given Cob at some time after the specified number of
-   * milliseconds.  (No guarantees exactly when.)
-   *
-   * Throws a std::system_error if an error occurs.
-   */
-  void runAfterDelay(
-      Func c,
-      uint32_t milliseconds,
-      TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
-
-  /**
-   * @see tryRunAfterDelay for more details
-   *
-   * @return  true iff the cob was successfully registered.
-   *
-   * */
-  bool tryRunAfterDelay(
-      Func cob,
-      uint32_t milliseconds,
-      TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
+  bool runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn);
 
   /**
    * Set the maximum desired latency in us and provide a callback which will be
    * called when that latency is exceeded.
    * OBS: This functionality depends on time-measurement.
    */
-  void setMaxLatency(int64_t maxLatency, Func maxLatencyCob) {
+  void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) {
     assert(enableTimeMeasurement_);
     maxLatency_ = maxLatency;
     maxLatencyCob_ = std::move(maxLatencyCob);
   }
 
-
   /**
    * Set smoothing coefficient for loop load average; # of milliseconds
    * for exp(-1) (1/2.71828...) decay.
    */
-  void setLoadAvgMsec(uint32_t ms);
+  void setLoadAvgMsec(std::chrono::milliseconds ms);
 
   /**
    * reset the load average to a desired value
@@ -467,7 +465,7 @@ class EventBase : private boost::noncopyable,
     * check if the event base loop is running.
    */
   bool isRunning() const {
-    return loopThread_.load(std::memory_order_relaxed) != 0;
+    return loopThread_.load(std::memory_order_relaxed) != std::thread::id();
   }
 
   /**
@@ -475,7 +473,7 @@ class EventBase : private boost::noncopyable,
    */
   void waitUntilRunning();
 
-  int getNotificationQueueSize() const;
+  size_t getNotificationQueueSize() const;
 
   void setMaxReadAtOnce(uint32_t maxAtOnce);
 
@@ -485,12 +483,19 @@ class EventBase : private boost::noncopyable,
    */
   bool isInEventBaseThread() const {
     auto tid = loopThread_.load(std::memory_order_relaxed);
-    return tid == 0 || pthread_equal(tid, pthread_self());
+    return tid == std::thread::id() || tid == std::this_thread::get_id();
   }
 
   bool inRunningEventBaseThread() const {
-    return pthread_equal(
-      loopThread_.load(std::memory_order_relaxed), pthread_self());
+    return loopThread_.load(std::memory_order_relaxed) ==
+        std::this_thread::get_id();
+  }
+
+  HHWheelTimer& timer() {
+    if (!wheelTimer_) {
+      wheelTimer_ = HHWheelTimer::newTimer(this);
+    }
+    return *wheelTimer_.get();
   }
 
   // --------- interface to underlying libevent base ------------
@@ -513,17 +518,19 @@ class EventBase : private boost::noncopyable,
 
   class SmoothLoopTime {
    public:
-    explicit SmoothLoopTime(uint64_t timeInterval)
-      : expCoeff_(-1.0/timeInterval)
-      , value_(0.0)
-      , oldBusyLeftover_(0) {
+    explicit SmoothLoopTime(std::chrono::microseconds timeInterval)
+        : expCoeff_(-1.0 / timeInterval.count()),
+          value_(0.0),
+          oldBusyLeftover_(0) {
       VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
     }
 
-    void setTimeInterval(uint64_t timeInterval);
+    void setTimeInterval(std::chrono::microseconds timeInterval);
     void reset(double value = 0.0);
 
-    void addSample(int64_t idle, int64_t busy);
+    void addSample(
+        std::chrono::microseconds idle,
+        std::chrono::microseconds busy);
 
     double get() const {
       return value_;
@@ -534,9 +541,9 @@ class EventBase : private boost::noncopyable,
     }
 
    private:
-    double  expCoeff_;
-    double  value_;
-    int64_t oldBusyLeftover_;
+    double expCoeff_;
+    double value_;
+    std::chrono::microseconds oldBusyLeftover_;
   };
 
   void setObserver(const std::shared_ptr<EventBaseObserver>& observer) {
@@ -584,93 +591,77 @@ class EventBase : private boost::noncopyable,
 
   /// Implements the DrivableExecutor interface
   void drive() override {
+    // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
+    // released inside a loop.
+    ++loopKeepAliveCount_;
+    SCOPE_EXIT {
+      --loopKeepAliveCount_;
+    };
     loopOnce();
   }
 
-  struct LoopKeepAliveDeleter {
-    void operator()(EventBase* evb) {
-      DCHECK(evb->isInEventBaseThread());
-      evb->loopKeepAliveCount_--;
-    }
-  };
-  using LoopKeepAlive = std::unique_ptr<EventBase, LoopKeepAliveDeleter>;
-
   /// Returns you a handle which make loop() behave like loopForever() until
   /// destroyed. loop() will return to its original behavior only when all
   /// loop keep-alives are released. Loop holder is safe to release only from
   /// EventBase thread.
-  ///
-  /// May return no op LoopKeepAlive if loopForever() is already running.
-  LoopKeepAlive loopKeepAlive() {
-    DCHECK(isInEventBaseThread());
-    loopKeepAliveCount_++;
-    return LoopKeepAlive(this);
+  KeepAlive getKeepAliveToken() override {
+    if (inRunningEventBaseThread()) {
+      loopKeepAliveCount_++;
+    } else {
+      loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
+    }
+    return makeKeepAlive();
   }
 
- private:
   // TimeoutManager
-  void attachTimeoutManager(AsyncTimeout* obj,
-                            TimeoutManager::InternalEnum internal) override;
+  void attachTimeoutManager(
+      AsyncTimeout* obj,
+      TimeoutManager::InternalEnum internal) override final;
 
-  void detachTimeoutManager(AsyncTimeout* obj) override;
+  void detachTimeoutManager(AsyncTimeout* obj) override final;
 
   bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout)
-    override;
+      override final;
 
-  void cancelTimeout(AsyncTimeout* obj) override;
+  void cancelTimeout(AsyncTimeout* obj) override final;
 
   bool isInTimeoutManagerThread() override final {
     return isInEventBaseThread();
   }
 
+ protected:
+  void keepAliveRelease() override {
+    DCHECK(isInEventBaseThread());
+    loopKeepAliveCount_--;
+  }
+
+ private:
   void applyLoopKeepAlive();
 
+  ssize_t loopKeepAliveCount();
+
   /*
    * Helper function that tells us whether we have already handled
    * some event/timeout/callback in this loop iteration.
    */
   bool nothingHandledYet() const noexcept;
 
-  // small object used as a callback arg with enough info to execute the
-  // appropriate client-provided Cob
-  class CobTimeout : public AsyncTimeout {
-   public:
-    CobTimeout(EventBase* b, Func c, TimeoutManager::InternalEnum in)
-        : AsyncTimeout(b, in), cob_(std::move(c)) {}
-
-    virtual void timeoutExpired() noexcept;
-
-   private:
-    Func cob_;
-
-   public:
-    typedef boost::intrusive::list_member_hook<
-      boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
-
-    ListHook hook;
-
-    typedef boost::intrusive::list<
-      CobTimeout,
-      boost::intrusive::member_hook<CobTimeout, ListHook, &CobTimeout::hook>,
-      boost::intrusive::constant_time_size<false> > List;
-  };
-
   typedef LoopCallback::List LoopCallbackList;
   class FunctionRunner;
 
   bool loopBody(int flags = 0);
 
   // executes any callbacks queued by runInLoop(); returns false if none found
-  bool runLoopCallbacks(bool setContext = true);
+  bool runLoopCallbacks();
 
   void initNotificationQueue();
 
-  CobTimeout::List pendingCobTimeouts_;
+  // should only be accessed through public getter
+  HHWheelTimer::UniquePtr wheelTimer_;
 
   LoopCallbackList loopCallbacks_;
   LoopCallbackList runBeforeLoopCallbacks_;
   LoopCallbackList onDestructionCallbacks_;
-  LoopCallbackList runAfterDrainCallbacks_;
 
   // This will be null most of the time, but point to currentCallbacks
   // if we are in the middle of running loop callbacks, such that
@@ -683,11 +674,8 @@ class EventBase : private boost::noncopyable,
   std::atomic<bool> stop_;
 
   // The ID of the thread running the main loop.
-  // 0 if loop is not running.
-  // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or
-  // even that atomic<pthread_t> is valid), but that's how it is
-  // everywhere (at least on Linux, FreeBSD, and OSX).
-  std::atomic<pthread_t> loopThread_;
+  // std::thread::id{} if loop is not running.
+  std::atomic<std::thread::id> loopThread_;
 
   // pointer to underlying event_base class doing the heavy lifting
   event_base* evb_;
@@ -696,11 +684,12 @@ class EventBase : private boost::noncopyable,
   // to send function requests to the EventBase thread.
   std::unique_ptr<NotificationQueue<Func>> queue_;
   std::unique_ptr<FunctionRunner> fnRunner_;
-  size_t loopKeepAliveCount_{0};
+  ssize_t loopKeepAliveCount_{0};
+  std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
   bool loopKeepAliveActive_{false};
 
   // limit for latency in microseconds (0 disables)
-  int64_t maxLatency_;
+  std::chrono::microseconds maxLatency_;
 
   // exponentially-smoothed average loop time for latency-limiting
   SmoothLoopTime avgLoopTime_;
@@ -718,14 +707,10 @@ class EventBase : private boost::noncopyable,
   // be supported: avg loop time, observer and max latency.
   const bool enableTimeMeasurement_;
 
-  // we'll wait this long before running deferred callbacks if the event
-  // loop is idle.
-  static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms
-
   // Wrap-around loop counter to detect beginning of each loop
   uint64_t nextLoopCnt_;
   uint64_t latestLoopCnt_;
-  uint64_t startWork_;
+  std::chrono::steady_clock::time_point startWork_;
   // Prevent undefined behavior from invoking event_base_loop() reentrantly.
   // This is needed since many projects use libevent-1.4, which lacks commit
   // b557b175c00dc462c1fce25f6e7dd67121d2c001 from
@@ -745,9 +730,6 @@ class EventBase : private boost::noncopyable,
   // allow runOnDestruction() to be called from any threads
   std::mutex onDestructionCallbacksMutex_;
 
-  // allow runAfterDrain() to be called from any threads
-  std::mutex runAfterDrainCallbacksMutex_;
-
   // see EventBaseLocal
   friend class detail::EventBaseLocalBase;
   template <typename T> friend class EventBaseLocal;