Unset RequestContext properly in EventBase::runLoopCallbacks
[folly.git] / folly / io / async / EventBase.h
index e15465c9e12fb673f686d6d9a12976a401764f9b..b450e97fee40762ed82c4ea83cd4797659fc1ee2 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #pragma once
 
-#include <glog/logging.h>
-#include <folly/io/async/AsyncTimeout.h>
-#include <folly/io/async/TimeoutManager.h>
-#include <folly/io/async/Request.h>
-#include <folly/Executor.h>
-#include <folly/experimental/ExecutionObserver.h>
-#include <folly/futures/DrivableExecutor.h>
-#include <memory>
-#include <stack>
+#include <atomic>
+#include <cstdlib>
+#include <errno.h>
+#include <functional>
 #include <list>
+#include <math.h>
+#include <memory>
+#include <mutex>
 #include <queue>
-#include <cstdlib>
 #include <set>
-#include <unordered_set>
+#include <stack>
 #include <unordered_map>
-#include <mutex>
+#include <unordered_set>
 #include <utility>
+
 #include <boost/intrusive/list.hpp>
 #include <boost/utility.hpp>
-#include <functional>
+
+#include <folly/Executor.h>
+#include <folly/Function.h>
+#include <folly/Portability.h>
+#include <folly/experimental/ExecutionObserver.h>
+#include <folly/futures/DrivableExecutor.h>
+#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/HHWheelTimer.h>
+#include <folly/io/async/Request.h>
+#include <folly/io/async/TimeoutManager.h>
+#include <folly/portability/PThread.h>
+#include <glog/logging.h>
+
 #include <event.h>  // libevent
-#include <errno.h>
-#include <math.h>
-#include <atomic>
 
 namespace folly {
 
@@ -47,13 +54,17 @@ typedef std::function<void()> Cob;
 template <typename MessageT>
 class NotificationQueue;
 
-#if !defined(ANDROID) && !defined(__ANDROID__) && !defined(__APPLE__)
 namespace detail {
 class EventBaseLocalBase;
+
+class EventBaseLocalBaseBase {
+ public:
+  virtual void onEventBaseDestruction(EventBase& evb) = 0;
+  virtual ~EventBaseLocalBaseBase() = default;
+};
 }
 template <typename T>
 class EventBaseLocal;
-#endif
 
 class EventBaseObserver {
  public:
@@ -112,6 +123,8 @@ class EventBase : private boost::noncopyable,
                   public TimeoutManager,
                   public DrivableExecutor {
  public:
+  using Func = folly::Function<void()>;
+
   /**
    * A callback interface to use with runInLoop()
    *
@@ -283,13 +296,12 @@ class EventBase : private boost::noncopyable,
   void runInLoop(LoopCallback* callback, bool thisIteration = false);
 
   /**
-   * Convenience function to call runInLoop() with a std::function.
+   * Convenience function to call runInLoop() with a folly::Function.
    *
-   * This creates a LoopCallback object to wrap the std::function, and invoke
-   * the std::function when the loop callback fires.  This is slightly more
+   * This creates a LoopCallback object to wrap the folly::Function, and invoke
+   * the folly::Function when the loop callback fires.  This is slightly more
    * expensive than defining your own LoopCallback, but more convenient in
-   * areas that aren't performance sensitive where you just want to use
-   * std::bind.  (std::bind is fairly slow on even by itself.)
+   * areas that aren't too performance sensitive.
    *
    * This method may only be called from the EventBase's thread.  This
    * essentially allows an event handler to schedule an additional callback to
@@ -297,9 +309,7 @@ class EventBase : private boost::noncopyable,
    *
    * Use runInEventBaseThread() to schedule functions from another thread.
    */
-  void runInLoop(const Cob& c, bool thisIteration = false);
-
-  void runInLoop(Cob&& c, bool thisIteration = false);
+  void runInLoop(Func c, bool thisIteration = false);
 
   /**
    * Adds the given callback to a queue of things run before destruction
@@ -313,6 +323,15 @@ class EventBase : private boost::noncopyable,
    */
   void runOnDestruction(LoopCallback* callback);
 
+  /**
+   * Adds the given callback to a queue of things run after the notification
+   * queue is drained before the destruction of current EventBase.
+   *
+   * Note: will be called from the thread that invoked EventBase destructor,
+   *       after the final run of loop callbacks.
+   */
+  void runAfterDrain(Func cob);
+
   /**
    * Adds a callback that will run immediately *before* the event loop.
    * This is very similar to runInLoop(), but will not cause the loop to break:
@@ -346,21 +365,17 @@ class EventBase : private boost::noncopyable,
    * @return Returns true if the function was successfully scheduled, or false
    *         if there was an error scheduling the function.
    */
-  template<typename T>
-  bool runInEventBaseThread(void (*fn)(T*), T* arg) {
-    return runInEventBaseThread(reinterpret_cast<void (*)(void*)>(fn),
-                                reinterpret_cast<void*>(arg));
-  }
-
-  bool runInEventBaseThread(void (*fn)(void*), void* arg);
+  template <typename T>
+  bool runInEventBaseThread(void (*fn)(T*), T* arg);
 
   /**
    * Run the specified function in the EventBase's thread
    *
-   * This version of runInEventBaseThread() takes a std::function object.
-   * Note that this is less efficient than the version that takes a plain
-   * function pointer and void* argument, as it has to allocate memory to copy
-   * the std::function object.
+   * This version of runInEventBaseThread() takes a folly::Function object.
+   * Note that this may be less efficient than the version that takes a plain
+   * function pointer and void* argument, if moving the function is expensive
+   * (e.g., if it wraps a lambda which captures some values with expensive move
+   * constructors).
    *
    * If the loop is terminated (and never later restarted) before it has a
    * chance to run the requested function, the function will be run upon the
@@ -368,56 +383,33 @@ class EventBase : private boost::noncopyable,
    *
    * The function must not throw any exceptions.
    */
-  bool runInEventBaseThread(const Cob& fn);
+  bool runInEventBaseThread(Func fn);
 
   /*
    * Like runInEventBaseThread, but the caller waits for the callback to be
    * executed.
    */
-  template<typename T>
-  bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
-    return runInEventBaseThreadAndWait(reinterpret_cast<void (*)(void*)>(fn),
-                                       reinterpret_cast<void*>(arg));
-  }
-
-  /*
-   * Like runInEventBaseThread, but the caller waits for the callback to be
-   * executed.
-   */
-  bool runInEventBaseThreadAndWait(void (*fn)(void*), void* arg) {
-    return runInEventBaseThreadAndWait(std::bind(fn, arg));
-  }
+  template <typename T>
+  bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg);
 
   /*
    * Like runInEventBaseThread, but the caller waits for the callback to be
    * executed.
    */
-  bool runInEventBaseThreadAndWait(const Cob& fn);
+  bool runInEventBaseThreadAndWait(Func fn);
 
   /*
    * Like runInEventBaseThreadAndWait, except if the caller is already in the
    * event base thread, the functor is simply run inline.
    */
-  template<typename T>
-  bool runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
-    return runImmediatelyOrRunInEventBaseThreadAndWait(
-        reinterpret_cast<void (*)(void*)>(fn), reinterpret_cast<void*>(arg));
-  }
+  template <typename T>
+  bool runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), T* arg);
 
   /*
    * Like runInEventBaseThreadAndWait, except if the caller is already in the
    * event base thread, the functor is simply run inline.
    */
-  bool runImmediatelyOrRunInEventBaseThreadAndWait(
-      void (*fn)(void*), void* arg) {
-    return runImmediatelyOrRunInEventBaseThreadAndWait(std::bind(fn, arg));
-  }
-
-  /*
-   * Like runInEventBaseThreadAndWait, except if the caller is already in the
-   * event base thread, the functor is simply run inline.
-   */
-  bool runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn);
+  bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
 
   /**
    * Runs the given Cob at some time after the specified number of
@@ -426,8 +418,8 @@ class EventBase : private boost::noncopyable,
    * Throws a std::system_error if an error occurs.
    */
   void runAfterDelay(
-      const Cob& c,
-      int milliseconds,
+      Func c,
+      uint32_t milliseconds,
       TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
 
   /**
@@ -437,8 +429,8 @@ class EventBase : private boost::noncopyable,
    *
    * */
   bool tryRunAfterDelay(
-      const Cob& cob,
-      int milliseconds,
+      Func cob,
+      uint32_t milliseconds,
       TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
 
   /**
@@ -446,10 +438,10 @@ class EventBase : private boost::noncopyable,
    * called when that latency is exceeded.
    * OBS: This functionality depends on time-measurement.
    */
-  void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) {
+  void setMaxLatency(int64_t maxLatency, Func maxLatencyCob) {
     assert(enableTimeMeasurement_);
     maxLatency_ = maxLatency;
-    maxLatencyCob_ = maxLatencyCob;
+    maxLatencyCob_ = std::move(maxLatencyCob);
   }
 
 
@@ -502,6 +494,13 @@ class EventBase : private boost::noncopyable,
       loopThread_.load(std::memory_order_relaxed), pthread_self());
   }
 
+  HHWheelTimer& timer() {
+    if (!wheelTimer_) {
+      wheelTimer_ = HHWheelTimer::newTimer(this);
+    }
+    return *wheelTimer_.get();
+  }
+
   // --------- interface to underlying libevent base ------------
   // Avoid using these functions if possible.  These functions are not
   // guaranteed to always be present if we ever provide alternative EventBase
@@ -518,7 +517,7 @@ class EventBase : private boost::noncopyable,
    * first handler fired within that cycle.
    *
    */
-  bool bumpHandlingTime() override;
+  void bumpHandlingTime() override final;
 
   class SmoothLoopTime {
    public:
@@ -588,16 +587,36 @@ class EventBase : private boost::noncopyable,
   void add(Cob fn) override {
     // runInEventBaseThread() takes a const&,
     // so no point in doing std::move here.
-    runInEventBaseThread(fn);
+    runInEventBaseThread(std::move(fn));
   }
 
   /// Implements the DrivableExecutor interface
   void drive() override {
+    auto keepAlive = loopKeepAlive();
     loopOnce();
   }
 
- private:
+  struct LoopKeepAliveDeleter {
+    void operator()(EventBase* evb) {
+      DCHECK(evb->isInEventBaseThread());
+      evb->loopKeepAliveCount_--;
+    }
+  };
+  using LoopKeepAlive = std::unique_ptr<EventBase, LoopKeepAliveDeleter>;
+
+  /// Returns you a handle which make loop() behave like loopForever() until
+  /// destroyed. loop() will return to its original behavior only when all
+  /// loop keep-alives are released. Loop holder is safe to release only from
+  /// EventBase thread.
+  ///
+  /// May return no op LoopKeepAlive if loopForever() is already running.
+  LoopKeepAlive loopKeepAlive() {
+    DCHECK(isInEventBaseThread());
+    loopKeepAliveCount_++;
+    return LoopKeepAlive(this);
+  }
 
+ private:
   // TimeoutManager
   void attachTimeoutManager(AsyncTimeout* obj,
                             TimeoutManager::InternalEnum internal) override;
@@ -609,42 +628,29 @@ class EventBase : private boost::noncopyable,
 
   void cancelTimeout(AsyncTimeout* obj) override;
 
-  bool isInTimeoutManagerThread() override {
+  bool isInTimeoutManagerThread() override final {
     return isInEventBaseThread();
   }
 
-  // Helper class used to short circuit runInEventBaseThread
-  class RunInLoopCallback : public LoopCallback {
-   public:
-    RunInLoopCallback(void (*fn)(void*), void* arg);
-    void runLoopCallback() noexcept;
-
-   private:
-    void (*fn_)(void*);
-    void* arg_;
-  };
+  void applyLoopKeepAlive();
 
   /*
    * Helper function that tells us whether we have already handled
    * some event/timeout/callback in this loop iteration.
    */
-  bool nothingHandledYet();
-
-  // --------- libevent callbacks (not for client use) ------------
-
-  static void runFunctionPtr(std::function<void()>* fn);
+  bool nothingHandledYet() const noexcept;
 
   // small object used as a callback arg with enough info to execute the
   // appropriate client-provided Cob
   class CobTimeout : public AsyncTimeout {
    public:
-    CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in)
-        : AsyncTimeout(b, in), cob_(c) {}
+    CobTimeout(EventBase* b, Func c, TimeoutManager::InternalEnum in)
+        : AsyncTimeout(b, in), cob_(std::move(c)) {}
 
     virtual void timeoutExpired() noexcept;
 
    private:
-    Cob cob_;
+    Func cob_;
 
    public:
     typedef boost::intrusive::list_member_hook<
@@ -664,15 +670,19 @@ class EventBase : private boost::noncopyable,
   bool loopBody(int flags = 0);
 
   // executes any callbacks queued by runInLoop(); returns false if none found
-  bool runLoopCallbacks(bool setContext = true);
+  bool runLoopCallbacks();
 
   void initNotificationQueue();
 
+  // should only be accessed through public getter
+  HHWheelTimer::UniquePtr wheelTimer_;
+
   CobTimeout::List pendingCobTimeouts_;
 
   LoopCallbackList loopCallbacks_;
   LoopCallbackList runBeforeLoopCallbacks_;
   LoopCallbackList onDestructionCallbacks_;
+  LoopCallbackList runAfterDrainCallbacks_;
 
   // This will be null most of the time, but point to currentCallbacks
   // if we are in the middle of running loop callbacks, such that
@@ -696,8 +706,10 @@ class EventBase : private boost::noncopyable,
 
   // A notification queue for runInEventBaseThread() to use
   // to send function requests to the EventBase thread.
-  std::unique_ptr<NotificationQueue<std::pair<void (*)(void*), void*>>> queue_;
+  std::unique_ptr<NotificationQueue<Func>> queue_;
   std::unique_ptr<FunctionRunner> fnRunner_;
+  size_t loopKeepAliveCount_{0};
+  bool loopKeepAliveActive_{false};
 
   // limit for latency in microseconds (0 disables)
   int64_t maxLatency_;
@@ -711,7 +723,7 @@ class EventBase : private boost::noncopyable,
   SmoothLoopTime maxLatencyLoopTime_;
 
   // callback called when latency limit is exceeded
-  Cob maxLatencyCob_;
+  Func maxLatencyCob_;
 
   // Enables/disables time measurements in loopBody(). if disabled, the
   // following functionality that relies on time-measurement, will not
@@ -726,6 +738,11 @@ class EventBase : private boost::noncopyable,
   uint64_t nextLoopCnt_;
   uint64_t latestLoopCnt_;
   uint64_t startWork_;
+  // Prevent undefined behavior from invoking event_base_loop() reentrantly.
+  // This is needed since many projects use libevent-1.4, which lacks commit
+  // b557b175c00dc462c1fce25f6e7dd67121d2c001 from
+  // https://github.com/libevent/libevent/.
+  bool invokingLoop_{false};
 
   // Observer to export counters
   std::shared_ptr<EventBaseObserver> observer_;
@@ -740,14 +757,32 @@ class EventBase : private boost::noncopyable,
   // allow runOnDestruction() to be called from any threads
   std::mutex onDestructionCallbacksMutex_;
 
-#if !defined(ANDROID) && !defined(__ANDROID__) && !defined(__APPLE__)
+  // allow runAfterDrain() to be called from any threads
+  std::mutex runAfterDrainCallbacksMutex_;
+
   // see EventBaseLocal
   friend class detail::EventBaseLocalBase;
   template <typename T> friend class EventBaseLocal;
   std::mutex localStorageMutex_;
   std::unordered_map<uint64_t, std::shared_ptr<void>> localStorage_;
-  std::unordered_set<detail::EventBaseLocalBase*> localStorageToDtor_;
-#endif
+  std::unordered_set<detail::EventBaseLocalBaseBase*> localStorageToDtor_;
 };
 
+template <typename T>
+bool EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) {
+  return runInEventBaseThread([=] { fn(arg); });
+}
+
+template <typename T>
+bool EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
+  return runInEventBaseThreadAndWait([=] { fn(arg); });
+}
+
+template <typename T>
+bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(
+    void (*fn)(T*),
+    T* arg) {
+  return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
+}
+
 } // folly