/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#pragma once
#include <atomic>
+#include <cerrno>
+#include <cmath>
#include <cstdlib>
-#include <errno.h>
#include <functional>
#include <list>
-#include <math.h>
#include <memory>
#include <mutex>
#include <queue>
#include <boost/intrusive/list.hpp>
#include <boost/utility.hpp>
+#include <glog/logging.h>
+
#include <folly/Executor.h>
+#include <folly/Function.h>
#include <folly/Portability.h>
+#include <folly/ScopeGuard.h>
+#include <folly/executors/DrivableExecutor.h>
#include <folly/experimental/ExecutionObserver.h>
-#include <folly/futures/DrivableExecutor.h>
#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/HHWheelTimer.h>
#include <folly/io/async/Request.h>
#include <folly/io/async/TimeoutManager.h>
-#include <glog/logging.h>
-
-#include <event.h> // libevent
+#include <folly/portability/Event.h>
+#include <folly/synchronization/CallOnce.h>
namespace folly {
-typedef std::function<void()> Cob;
+using Cob = Func; // defined in folly/Executor.h
template <typename MessageT>
class NotificationQueue;
virtual void onEventBaseDestruction(EventBase& evb) = 0;
virtual ~EventBaseLocalBaseBase() = default;
};
-}
+} // namespace detail
template <typename T>
class EventBaseLocal;
std::unique_ptr<RequestEventBase>(new RequestEventBase(eb)));
}
+ bool hasCallback() override {
+ return false;
+ }
+
private:
explicit RequestEventBase(EventBase* eb) : eb_(eb) {}
EventBase* eb_;
static constexpr const char* kContextDataName{"EventBase"};
};
+class VirtualEventBase;
+
/**
* This class is a wrapper for all asynchronous I/O processing functionality
*
public TimeoutManager,
public DrivableExecutor {
public:
+ using Func = folly::Function<void()>;
+
/**
* A callback interface to use with runInLoop()
*
* If a LoopCallback object is destroyed while it is scheduled to be run in
* the next loop iteration, it will automatically be cancelled.
*/
- class LoopCallback {
+ class LoopCallback
+ : public boost::intrusive::list_base_hook<
+ boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
public:
virtual ~LoopCallback() = default;
virtual void runLoopCallback() noexcept = 0;
void cancelLoopCallback() {
- hook_.unlink();
+ context_.reset();
+ unlink();
}
bool isLoopCallbackScheduled() const {
- return hook_.is_linked();
+ return is_linked();
}
private:
- typedef boost::intrusive::list_member_hook<
- boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
-
- ListHook hook_;
-
typedef boost::intrusive::list<
LoopCallback,
- boost::intrusive::member_hook<LoopCallback, ListHook,
- &LoopCallback::hook_>,
boost::intrusive::constant_time_size<false> > List;
// EventBase needs access to LoopCallbackList (and therefore to hook_)
friend class EventBase;
+ friend class VirtualEventBase;
std::shared_ptr<RequestContext> context_;
};
+ class FunctionLoopCallback : public LoopCallback {
+ public:
+ explicit FunctionLoopCallback(Func&& function)
+ : function_(std::move(function)) {}
+
+ void runLoopCallback() noexcept override {
+ function_();
+ delete this;
+ }
+
+ private:
+ Func function_;
+ };
+
+ // Like FunctionLoopCallback, but saves one allocation. Use with caution.
+ //
+ // The caller is responsible for maintaining the lifetime of this callback
+ // until after the point at which the contained function is called.
+ class StackFunctionLoopCallback : public LoopCallback {
+ public:
+ explicit StackFunctionLoopCallback(Func&& function)
+ : function_(std::move(function)) {}
+ void runLoopCallback() noexcept override {
+ Func(std::move(function_))();
+ }
+
+ private:
+ Func function_;
+ };
+
+ /**
+ * Create a new EventBase object.
+ *
+ * Same as EventBase(true), which constructs an EventBase that measures time.
+ */
+ EventBase() : EventBase(true) {}
+
/**
* Create a new EventBase object.
*
* that relies on time-measurement, including:
* observer, max latency and avg loop time.
*/
- explicit EventBase(bool enableTimeMeasurement = true);
+ explicit EventBase(bool enableTimeMeasurement);
/**
* Create a new EventBase object that will use the specified libevent
* observer, max latency and avg loop time.
*/
explicit EventBase(event_base* evb, bool enableTimeMeasurement = true);
- ~EventBase();
+ ~EventBase() override;
/**
* Runs the event loop.
void runInLoop(LoopCallback* callback, bool thisIteration = false);
/**
- * Convenience function to call runInLoop() with a std::function.
+ * Convenience function to call runInLoop() with a folly::Function.
*
- * This creates a LoopCallback object to wrap the std::function, and invoke
- * the std::function when the loop callback fires. This is slightly more
+ * This creates a LoopCallback object to wrap the folly::Function, and invoke
+ * the folly::Function when the loop callback fires. This is slightly more
* expensive than defining your own LoopCallback, but more convenient in
- * areas that aren't performance sensitive where you just want to use
- * std::bind. (std::bind is fairly slow on even by itself.)
+ * areas that aren't too performance sensitive.
*
* This method may only be called from the EventBase's thread. This
* essentially allows an event handler to schedule an additional callback to
*
* Use runInEventBaseThread() to schedule functions from another thread.
*/
- void runInLoop(const Cob& c, bool thisIteration = false);
-
- void runInLoop(Cob&& c, bool thisIteration = false);
+ void runInLoop(Func c, bool thisIteration = false);
/**
* Adds the given callback to a queue of things run before destruction
*/
void runOnDestruction(LoopCallback* callback);
- /**
- * Adds the given callback to a queue of things run after the notification
- * queue is drained before the destruction of current EventBase.
- *
- * Note: will be called from the thread that invoked EventBase destructor,
- * after the final run of loop callbacks.
- */
- void runAfterDrain(Cob&& cob);
-
/**
* Adds a callback that will run immediately *before* the event loop.
* This is very similar to runInLoop(), but will not cause the loop to break:
/**
* Run the specified function in the EventBase's thread
*
- * This version of runInEventBaseThread() takes a std::function object.
- * Note that this is less efficient than the version that takes a plain
- * function pointer and void* argument, as it has to allocate memory to copy
- * the std::function object.
+ * This version of runInEventBaseThread() takes a folly::Function object.
+ * Note that this may be less efficient than the version that takes a plain
+ * function pointer and void* argument, if moving the function is expensive
+ * (e.g., if it wraps a lambda which captures some values with expensive move
+ * constructors).
*
* If the loop is terminated (and never later restarted) before it has a
* chance to run the requested function, the function will be run upon the
*
* The function must not throw any exceptions.
*/
- bool runInEventBaseThread(const Cob& fn);
+ bool runInEventBaseThread(Func fn);
/*
* Like runInEventBaseThread, but the caller waits for the callback to be
* Like runInEventBaseThread, but the caller waits for the callback to be
* executed.
*/
- bool runInEventBaseThreadAndWait(const Cob& fn);
+ bool runInEventBaseThreadAndWait(Func fn);
/*
* Like runInEventBaseThreadAndWait, except if the caller is already in the
* Like runInEventBaseThreadAndWait, except if the caller is already in the
* event base thread, the functor is simply run inline.
*/
- bool runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn);
-
- /**
- * Runs the given Cob at some time after the specified number of
- * milliseconds. (No guarantees exactly when.)
- *
- * Throws a std::system_error if an error occurs.
- */
- void runAfterDelay(
- const Cob& c,
- uint32_t milliseconds,
- TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
-
- /**
- * @see tryRunAfterDelay for more details
- *
- * @return true iff the cob was successfully registered.
- *
- * */
- bool tryRunAfterDelay(
- const Cob& cob,
- uint32_t milliseconds,
- TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
+ bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
/**
* Set the maximum desired latency in us and provide a callback which will be
* called when that latency is exceeded.
* OBS: This functionality depends on time-measurement.
*/
- void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) {
+ void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) {
assert(enableTimeMeasurement_);
maxLatency_ = maxLatency;
- maxLatencyCob_ = maxLatencyCob;
+ maxLatencyCob_ = std::move(maxLatencyCob);
}
-
/**
* Set smoothing coefficient for loop load average; # of milliseconds
* for exp(-1) (1/2.71828...) decay.
*/
- void setLoadAvgMsec(uint32_t ms);
+ void setLoadAvgMsec(std::chrono::milliseconds ms);
/**
* reset the load average to a desired value
* check if the event base loop is running.
*/
bool isRunning() const {
- return loopThread_.load(std::memory_order_relaxed) != 0;
+ return loopThread_.load(std::memory_order_relaxed) != std::thread::id();
}
/**
*/
void waitUntilRunning();
- int getNotificationQueueSize() const;
+ size_t getNotificationQueueSize() const;
void setMaxReadAtOnce(uint32_t maxAtOnce);
*/
bool isInEventBaseThread() const {
auto tid = loopThread_.load(std::memory_order_relaxed);
- return tid == 0 || pthread_equal(tid, pthread_self());
+ return tid == std::thread::id() || tid == std::this_thread::get_id();
}
bool inRunningEventBaseThread() const {
- return pthread_equal(
- loopThread_.load(std::memory_order_relaxed), pthread_self());
+ return loopThread_.load(std::memory_order_relaxed) ==
+ std::this_thread::get_id();
+ }
+
+ /**
+ * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for
+ * dcheckIsInEventBaseThread), but it prints more information on
+ * failure.
+ */
+ void checkIsInEventBaseThread() const;
+ void dcheckIsInEventBaseThread() const {
+ if (kIsDebug) {
+ checkIsInEventBaseThread();
+ }
+ }
+
+ HHWheelTimer& timer() {
+ if (!wheelTimer_) {
+ wheelTimer_ = HHWheelTimer::newTimer(this);
+ }
+ return *wheelTimer_.get();
}
// --------- interface to underlying libevent base ------------
* first handler fired within that cycle.
*
*/
- bool bumpHandlingTime() override;
+ void bumpHandlingTime() final;
class SmoothLoopTime {
public:
- explicit SmoothLoopTime(uint64_t timeInterval)
- : expCoeff_(-1.0/timeInterval)
- , value_(0.0)
- , oldBusyLeftover_(0) {
+ explicit SmoothLoopTime(std::chrono::microseconds timeInterval)
+ : expCoeff_(-1.0 / timeInterval.count()),
+ value_(0.0),
+ oldBusyLeftover_(0) {
VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
}
- void setTimeInterval(uint64_t timeInterval);
+ void setTimeInterval(std::chrono::microseconds timeInterval);
void reset(double value = 0.0);
- void addSample(int64_t idle, int64_t busy);
+ void addSample(
+ std::chrono::microseconds idle,
+ std::chrono::microseconds busy);
double get() const {
return value_;
}
private:
- double expCoeff_;
- double value_;
- int64_t oldBusyLeftover_;
+ double expCoeff_;
+ double value_;
+ std::chrono::microseconds oldBusyLeftover_;
};
void setObserver(const std::shared_ptr<EventBaseObserver>& observer) {
void add(Cob fn) override {
// runInEventBaseThread() takes a const&,
// so no point in doing std::move here.
- runInEventBaseThread(fn);
+ runInEventBaseThread(std::move(fn));
}
/// Implements the DrivableExecutor interface
void drive() override {
+ ++loopKeepAliveCount_;
+ SCOPE_EXIT {
+ --loopKeepAliveCount_;
+ };
loopOnce();
}
- private:
+ /// Returns you a handle which make loop() behave like loopForever() until
+ /// destroyed. loop() will return to its original behavior only when all
+ /// loop keep-alives are released.
+ KeepAlive getKeepAliveToken() override {
+ keepAliveAcquire();
+ return makeKeepAlive();
+ }
+
// TimeoutManager
- void attachTimeoutManager(AsyncTimeout* obj,
- TimeoutManager::InternalEnum internal) override;
+ void attachTimeoutManager(
+ AsyncTimeout* obj,
+ TimeoutManager::InternalEnum internal) final;
- void detachTimeoutManager(AsyncTimeout* obj) override;
+ void detachTimeoutManager(AsyncTimeout* obj) final;
bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout)
- override;
+ final;
- void cancelTimeout(AsyncTimeout* obj) override;
+ void cancelTimeout(AsyncTimeout* obj) final;
- bool isInTimeoutManagerThread() override {
+ bool isInTimeoutManagerThread() final {
return isInEventBaseThread();
}
- /*
- * Helper function that tells us whether we have already handled
- * some event/timeout/callback in this loop iteration.
- */
- bool nothingHandledYet();
-
- // --------- libevent callbacks (not for client use) ------------
-
- static void runFunctionPtr(Cob* fn);
-
- // small object used as a callback arg with enough info to execute the
- // appropriate client-provided Cob
- class CobTimeout : public AsyncTimeout {
- public:
- CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in)
- : AsyncTimeout(b, in), cob_(c) {}
-
- virtual void timeoutExpired() noexcept;
+ // Returns a VirtualEventBase attached to this EventBase. Can be used to
+ // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be
+ // destroyed together with the EventBase.
+ //
+ // Any number of VirtualEventBases instances may be independently constructed,
+ // which are backed by this EventBase. This method should be only used if you
+ // don't need to manage the life time of the VirtualEventBase used.
+ folly::VirtualEventBase& getVirtualEventBase();
+
+ protected:
+ void keepAliveAcquire() override {
+ if (inRunningEventBaseThread()) {
+ loopKeepAliveCount_++;
+ } else {
+ loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
- private:
- Cob cob_;
+ void keepAliveRelease() override {
+ if (!inRunningEventBaseThread()) {
+ return add([=] { keepAliveRelease(); });
+ }
+ loopKeepAliveCount_--;
+ }
- public:
- typedef boost::intrusive::list_member_hook<
- boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
+ private:
+ void applyLoopKeepAlive();
- ListHook hook;
+ ssize_t loopKeepAliveCount();
- typedef boost::intrusive::list<
- CobTimeout,
- boost::intrusive::member_hook<CobTimeout, ListHook, &CobTimeout::hook>,
- boost::intrusive::constant_time_size<false> > List;
- };
+ /*
+ * Helper function that tells us whether we have already handled
+ * some event/timeout/callback in this loop iteration.
+ */
+ bool nothingHandledYet() const noexcept;
typedef LoopCallback::List LoopCallbackList;
class FunctionRunner;
bool loopBody(int flags = 0);
// executes any callbacks queued by runInLoop(); returns false if none found
- bool runLoopCallbacks(bool setContext = true);
+ bool runLoopCallbacks();
void initNotificationQueue();
- CobTimeout::List pendingCobTimeouts_;
+ // should only be accessed through public getter
+ HHWheelTimer::UniquePtr wheelTimer_;
LoopCallbackList loopCallbacks_;
LoopCallbackList runBeforeLoopCallbacks_;
LoopCallbackList onDestructionCallbacks_;
- LoopCallbackList runAfterDrainCallbacks_;
// This will be null most of the time, but point to currentCallbacks
// if we are in the middle of running loop callbacks, such that
std::atomic<bool> stop_;
// The ID of the thread running the main loop.
- // 0 if loop is not running.
- // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or
- // even that atomic<pthread_t> is valid), but that's how it is
- // everywhere (at least on Linux, FreeBSD, and OSX).
- std::atomic<pthread_t> loopThread_;
+ // std::thread::id{} if loop is not running.
+ std::atomic<std::thread::id> loopThread_;
// pointer to underlying event_base class doing the heavy lifting
event_base* evb_;
// A notification queue for runInEventBaseThread() to use
// to send function requests to the EventBase thread.
- std::unique_ptr<NotificationQueue<Cob>> queue_;
+ std::unique_ptr<NotificationQueue<Func>> queue_;
std::unique_ptr<FunctionRunner> fnRunner_;
+ ssize_t loopKeepAliveCount_{0};
+ std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
+ bool loopKeepAliveActive_{false};
// limit for latency in microseconds (0 disables)
- int64_t maxLatency_;
+ std::chrono::microseconds maxLatency_;
// exponentially-smoothed average loop time for latency-limiting
SmoothLoopTime avgLoopTime_;
SmoothLoopTime maxLatencyLoopTime_;
// callback called when latency limit is exceeded
- Cob maxLatencyCob_;
+ Func maxLatencyCob_;
// Enables/disables time measurements in loopBody(). if disabled, the
// following functionality that relies on time-measurement, will not
// be supported: avg loop time, observer and max latency.
const bool enableTimeMeasurement_;
- // we'll wait this long before running deferred callbacks if the event
- // loop is idle.
- static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms
-
// Wrap-around loop counter to detect beginning of each loop
uint64_t nextLoopCnt_;
uint64_t latestLoopCnt_;
- uint64_t startWork_;
+ std::chrono::steady_clock::time_point startWork_;
+ // Prevent undefined behavior from invoking event_base_loop() reentrantly.
+ // This is needed since many projects use libevent-1.4, which lacks commit
+ // b557b175c00dc462c1fce25f6e7dd67121d2c001 from
+ // https://github.com/libevent/libevent/.
+ bool invokingLoop_{false};
// Observer to export counters
std::shared_ptr<EventBaseObserver> observer_;
// allow runOnDestruction() to be called from any threads
std::mutex onDestructionCallbacksMutex_;
- // allow runAfterDrain() to be called from any threads
- std::mutex runAfterDrainCallbacksMutex_;
-
// see EventBaseLocal
friend class detail::EventBaseLocalBase;
template <typename T> friend class EventBaseLocal;
- std::mutex localStorageMutex_;
std::unordered_map<uint64_t, std::shared_ptr<void>> localStorage_;
std::unordered_set<detail::EventBaseLocalBaseBase*> localStorageToDtor_;
-};
-namespace detail {
-
-/**
- * Define a small functor (2 pointers) and specialize
- * std::__is_location_invariant so that std::function does not require
- * memory allocation.
- *
- * std::function<void()> func = SmallFunctor{f, p};
- *
- * TODO(lucian): remove this hack once GCC <= 4.9 are deprecated.
- * In GCC >= 5.0 just use a lambda like:
- *
- * std::function<void()> func = [=] { f(p); };
- *
- * See https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61909
- */
-template <class T>
-struct SmallFunctor {
- void (*fn)(T*);
- T* p;
- void operator()() { fn(p); }
+ folly::once_flag virtualEventBaseInitFlag_;
+ std::unique_ptr<VirtualEventBase> virtualEventBase_;
};
-} // detail
-
template <typename T>
bool EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) {
- return runInEventBaseThread(detail::SmallFunctor<T>{fn, arg});
+ return runInEventBaseThread([=] { fn(arg); });
}
template <typename T>
bool EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
- return runInEventBaseThreadAndWait(detail::SmallFunctor<T>{fn, arg});
+ return runInEventBaseThreadAndWait([=] { fn(arg); });
}
template <typename T>
-bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*),
- T* arg) {
- return runImmediatelyOrRunInEventBaseThreadAndWait(
- detail::SmallFunctor<T>{fn, arg});
+bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(
+ void (*fn)(T*),
+ T* arg) {
+ return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
}
-} // folly
-
-FOLLY_NAMESPACE_STD_BEGIN
-
-/**
- * GCC's libstdc++ uses __is_location_invariant to decide wether to
- * use small object optimization and embed the functor's contents in
- * the std::function object.
- *
- * (gcc 4.9) $ libstdc++-v3/include/std/functional
- * template<typename _Tp>
- * struct __is_location_invariant
- * : integral_constant<bool, (is_pointer<_Tp>::value
- * || is_member_pointer<_Tp>::value)>
- * { };
- *
- * (gcc 5.0) $ libstdc++-v3/include/std/functional
- *
- * template<typename _Tp>
- * struct __is_location_invariant
- * : is_trivially_copyable<_Tp>::type
- * { };
- *
- *
- * NOTE: Forward declare so this doesn't break when using other
- * standard libraries: it just wont have any effect.
- */
-template <typename T>
-struct __is_location_invariant;
-
-template <typename T>
-struct __is_location_invariant<folly::detail::SmallFunctor<T>>
- : public std::true_type {};
-
-FOLLY_NAMESPACE_STD_END
+} // namespace folly