/*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#pragma once
-#include <glog/logging.h>
-#include <folly/io/async/AsyncTimeout.h>
-#include <folly/io/async/TimeoutManager.h>
-#include <folly/wangle/Executor.h>
-#include <memory>
-#include <stack>
+#include <atomic>
+#include <cstdlib>
+#include <errno.h>
+#include <functional>
#include <list>
+#include <math.h>
+#include <memory>
+#include <mutex>
#include <queue>
-#include <cstdlib>
#include <set>
+#include <stack>
+#include <unordered_map>
+#include <unordered_set>
#include <utility>
+
#include <boost/intrusive/list.hpp>
#include <boost/utility.hpp>
-#include <functional>
+
+#include <folly/Executor.h>
+#include <folly/Function.h>
+#include <folly/Portability.h>
+#include <folly/experimental/ExecutionObserver.h>
+#include <folly/futures/DrivableExecutor.h>
+#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/HHWheelTimer.h>
+#include <folly/io/async/Request.h>
+#include <folly/io/async/TimeoutManager.h>
+#include <folly/portability/PThread.h>
+#include <glog/logging.h>
+
#include <event.h> // libevent
-#include <errno.h>
-#include <math.h>
-#include <atomic>
namespace folly {
template <typename MessageT>
class NotificationQueue;
+namespace detail {
+class EventBaseLocalBase;
+
+class EventBaseLocalBaseBase {
+ public:
+ virtual void onEventBaseDestruction(EventBase& evb) = 0;
+ virtual ~EventBaseLocalBaseBase() = default;
+};
+}
+template <typename T>
+class EventBaseLocal;
+
class EventBaseObserver {
public:
- virtual ~EventBaseObserver() {}
+ virtual ~EventBaseObserver() = default;
virtual uint32_t getSampleRate() const = 0;
int64_t busyTime, int64_t idleTime) = 0;
};
+// Helper class that sets and retrieves the EventBase associated with a given
+// request via RequestContext. See Request.h for that mechanism.
+class RequestEventBase : public RequestData {
+ public:
+ static EventBase* get() {
+ auto data = dynamic_cast<RequestEventBase*>(
+ RequestContext::get()->getContextData(kContextDataName));
+ if (!data) {
+ return nullptr;
+ }
+ return data->eb_;
+ }
+
+ static void set(EventBase* eb) {
+ RequestContext::get()->setContextData(
+ kContextDataName,
+ std::unique_ptr<RequestEventBase>(new RequestEventBase(eb)));
+ }
+
+ private:
+ explicit RequestEventBase(EventBase* eb) : eb_(eb) {}
+ EventBase* eb_;
+ static constexpr const char* kContextDataName{"EventBase"};
+};
+
/**
* This class is a wrapper for all asynchronous I/O processing functionality
*
* EventBase from other threads. When it is safe to call a method from
* another thread it is explicitly listed in the method comments.
*/
-class EventBase :
- private boost::noncopyable, public TimeoutManager, public wangle::Executor
-{
+class EventBase : private boost::noncopyable,
+ public TimeoutManager,
+ public DrivableExecutor {
public:
+ using Func = folly::Function<void()>;
+
/**
* A callback interface to use with runInLoop()
*
*/
class LoopCallback {
public:
- virtual ~LoopCallback() {}
+ virtual ~LoopCallback() = default;
virtual void runLoopCallback() noexcept = 0;
void cancelLoopCallback() {
/**
* Create a new EventBase object.
+ *
+ * @param enableTimeMeasurement Informs whether this event base should measure
+ * time. Disabling it would likely improve
+ * performance, but will disable some features
+ * that relies on time-measurement, including:
+ * observer, max latency and avg loop time.
*/
- EventBase();
+ explicit EventBase(bool enableTimeMeasurement = true);
/**
* Create a new EventBase object that will use the specified libevent
*
* The EventBase will take ownership of this event_base, and will call
* event_base_free(evb) when the EventBase is destroyed.
+ *
+ * @param enableTimeMeasurement Informs whether this event base should measure
+ * time. Disabling it would likely improve
+ * performance, but will disable some features
+ * that relies on time-measurement, including:
+ * observer, max latency and avg loop time.
*/
- explicit EventBase(event_base* evb);
+ explicit EventBase(event_base* evb, bool enableTimeMeasurement = true);
~EventBase();
/**
void runInLoop(LoopCallback* callback, bool thisIteration = false);
/**
- * Convenience function to call runInLoop() with a std::function.
+ * Convenience function to call runInLoop() with a folly::Function.
*
- * This creates a LoopCallback object to wrap the std::function, and invoke
- * the std::function when the loop callback fires. This is slightly more
+ * This creates a LoopCallback object to wrap the folly::Function, and invoke
+ * the folly::Function when the loop callback fires. This is slightly more
* expensive than defining your own LoopCallback, but more convenient in
- * areas that aren't performance sensitive where you just want to use
- * std::bind. (std::bind is fairly slow on even by itself.)
+ * areas that aren't too performance sensitive.
*
* This method may only be called from the EventBase's thread. This
* essentially allows an event handler to schedule an additional callback to
*
* Use runInEventBaseThread() to schedule functions from another thread.
*/
- void runInLoop(const Cob& c, bool thisIteration = false);
-
- void runInLoop(Cob&& c, bool thisIteration = false);
+ void runInLoop(Func c, bool thisIteration = false);
/**
* Adds the given callback to a queue of things run before destruction
*/
void runOnDestruction(LoopCallback* callback);
+ /**
+ * Adds the given callback to a queue of things run after the notification
+ * queue is drained before the destruction of current EventBase.
+ *
+ * Note: will be called from the thread that invoked EventBase destructor,
+ * after the final run of loop callbacks.
+ */
+ void runAfterDrain(Func cob);
+
/**
* Adds a callback that will run immediately *before* the event loop.
* This is very similar to runInLoop(), but will not cause the loop to break:
* @return Returns true if the function was successfully scheduled, or false
* if there was an error scheduling the function.
*/
- template<typename T>
- bool runInEventBaseThread(void (*fn)(T*), T* arg) {
- return runInEventBaseThread(reinterpret_cast<void (*)(void*)>(fn),
- reinterpret_cast<void*>(arg));
- }
-
- bool runInEventBaseThread(void (*fn)(void*), void* arg);
+ template <typename T>
+ bool runInEventBaseThread(void (*fn)(T*), T* arg);
/**
* Run the specified function in the EventBase's thread
*
- * This version of runInEventBaseThread() takes a std::function object.
- * Note that this is less efficient than the version that takes a plain
- * function pointer and void* argument, as it has to allocate memory to copy
- * the std::function object.
+ * This version of runInEventBaseThread() takes a folly::Function object.
+ * Note that this may be less efficient than the version that takes a plain
+ * function pointer and void* argument, if moving the function is expensive
+ * (e.g., if it wraps a lambda which captures some values with expensive move
+ * constructors).
*
* If the loop is terminated (and never later restarted) before it has a
* chance to run the requested function, the function will be run upon the
*
* The function must not throw any exceptions.
*/
- bool runInEventBaseThread(const Cob& fn);
+ bool runInEventBaseThread(Func fn);
+
+ /*
+ * Like runInEventBaseThread, but the caller waits for the callback to be
+ * executed.
+ */
+ template <typename T>
+ bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg);
+
+ /*
+ * Like runInEventBaseThread, but the caller waits for the callback to be
+ * executed.
+ */
+ bool runInEventBaseThreadAndWait(Func fn);
+
+ /*
+ * Like runInEventBaseThreadAndWait, except if the caller is already in the
+ * event base thread, the functor is simply run inline.
+ */
+ template <typename T>
+ bool runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), T* arg);
+
+ /*
+ * Like runInEventBaseThreadAndWait, except if the caller is already in the
+ * event base thread, the functor is simply run inline.
+ */
+ bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
/**
* Runs the given Cob at some time after the specified number of
* milliseconds. (No guarantees exactly when.)
*
- * @return true iff the cob was successfully registered.
+ * Throws a std::system_error if an error occurs.
*/
- bool runAfterDelay(
- const Cob& c,
- int milliseconds,
- TimeoutManager::InternalEnum = TimeoutManager::InternalEnum::NORMAL);
+ void runAfterDelay(
+ Func c,
+ uint32_t milliseconds,
+ TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
+
+ /**
+ * @see tryRunAfterDelay for more details
+ *
+ * @return true iff the cob was successfully registered.
+ *
+ * */
+ bool tryRunAfterDelay(
+ Func cob,
+ uint32_t milliseconds,
+ TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL);
/**
* Set the maximum desired latency in us and provide a callback which will be
* called when that latency is exceeded.
+ * OBS: This functionality depends on time-measurement.
*/
- void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) {
+ void setMaxLatency(int64_t maxLatency, Func maxLatencyCob) {
+ assert(enableTimeMeasurement_);
maxLatency_ = maxLatency;
- maxLatencyCob_ = maxLatencyCob;
+ maxLatencyCob_ = std::move(maxLatencyCob);
}
+
/**
* Set smoothing coefficient for loop load average; # of milliseconds
* for exp(-1) (1/2.71828...) decay.
* Get the average loop time in microseconds (an exponentially-smoothed ave)
*/
double getAvgLoopTime() const {
+ assert(enableTimeMeasurement_);
return avgLoopTime_.get();
}
loopThread_.load(std::memory_order_relaxed), pthread_self());
}
+ HHWheelTimer& timer() {
+ if (!wheelTimer_) {
+ wheelTimer_ = HHWheelTimer::newTimer(this, std::chrono::milliseconds(1));
+ }
+ return *wheelTimer_.get();
+ }
+
// --------- interface to underlying libevent base ------------
// Avoid using these functions if possible. These functions are not
// guaranteed to always be present if we ever provide alternative EventBase
* first handler fired within that cycle.
*
*/
- bool bumpHandlingTime();
+ void bumpHandlingTime() override final;
class SmoothLoopTime {
public:
int64_t oldBusyLeftover_;
};
- void setObserver(
- const std::shared_ptr<EventBaseObserver>& observer) {
+ void setObserver(const std::shared_ptr<EventBaseObserver>& observer) {
+ assert(enableTimeMeasurement_);
observer_ = observer;
}
return observer_;
}
+ /**
+ * Setup execution observation/instrumentation for every EventHandler
+ * executed in this EventBase.
+ *
+ * @param executionObserver EventHandle's execution observer.
+ */
+ void setExecutionObserver(ExecutionObserver* observer) {
+ executionObserver_ = observer;
+ }
+
+ /**
+ * Gets the execution observer associated with this EventBase.
+ */
+ ExecutionObserver* getExecutionObserver() {
+ return executionObserver_;
+ }
+
/**
* Set the name of the thread that runs this event base.
*/
*/
const std::string& getName();
- /// Implements the wangle::Executor interface
+ /// Implements the Executor interface
void add(Cob fn) override {
// runInEventBaseThread() takes a const&,
// so no point in doing std::move here.
- runInEventBaseThread(fn);
+ runInEventBaseThread(std::move(fn));
}
- private:
+ /// Implements the DrivableExecutor interface
+ void drive() override {
+ loopOnce();
+ }
+ struct LoopKeepAliveDeleter {
+ void operator()(EventBase* evb) {
+ DCHECK(evb->isInEventBaseThread());
+ evb->loopKeepAliveCount_--;
+ }
+ };
+ using LoopKeepAlive = std::unique_ptr<EventBase, LoopKeepAliveDeleter>;
+
+ /// Returns you a handle which make loop() behave like loopForever() until
+ /// destroyed. loop() will return to its original behavior only when all
+ /// loop keep-alives are released. Loop holder is safe to release only from
+ /// EventBase thread.
+ ///
+ /// May return no op LoopKeepAlive if loopForever() is already running.
+ LoopKeepAlive loopKeepAlive() {
+ DCHECK(isInEventBaseThread());
+ loopKeepAliveCount_++;
+ return LoopKeepAlive(this);
+ }
+
+ private:
// TimeoutManager
void attachTimeoutManager(AsyncTimeout* obj,
- TimeoutManager::InternalEnum internal);
+ TimeoutManager::InternalEnum internal) override;
- void detachTimeoutManager(AsyncTimeout* obj);
+ void detachTimeoutManager(AsyncTimeout* obj) override;
- bool scheduleTimeout(AsyncTimeout* obj, std::chrono::milliseconds timeout);
+ bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout)
+ override;
- void cancelTimeout(AsyncTimeout* obj);
+ void cancelTimeout(AsyncTimeout* obj) override;
- bool isInTimeoutManagerThread() {
+ bool isInTimeoutManagerThread() override final {
return isInEventBaseThread();
}
- // Helper class used to short circuit runInEventBaseThread
- class RunInLoopCallback : public LoopCallback {
- public:
- RunInLoopCallback(void (*fn)(void*), void* arg);
- void runLoopCallback() noexcept;
-
- private:
- void (*fn_)(void*);
- void* arg_;
- };
+ void applyLoopKeepAlive();
/*
* Helper function that tells us whether we have already handled
* some event/timeout/callback in this loop iteration.
*/
- bool nothingHandledYet();
-
- // --------- libevent callbacks (not for client use) ------------
-
- static void runFunctionPtr(std::function<void()>* fn);
+ bool nothingHandledYet() const noexcept;
// small object used as a callback arg with enough info to execute the
// appropriate client-provided Cob
class CobTimeout : public AsyncTimeout {
public:
- CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in)
- : AsyncTimeout(b, in), cob_(c) {}
+ CobTimeout(EventBase* b, Func c, TimeoutManager::InternalEnum in)
+ : AsyncTimeout(b, in), cob_(std::move(c)) {}
virtual void timeoutExpired() noexcept;
private:
- Cob cob_;
+ Func cob_;
public:
typedef boost::intrusive::list_member_hook<
void initNotificationQueue();
+ // should only be accessed through public getter
+ HHWheelTimer::UniquePtr wheelTimer_;
+
CobTimeout::List pendingCobTimeouts_;
LoopCallbackList loopCallbacks_;
LoopCallbackList runBeforeLoopCallbacks_;
LoopCallbackList onDestructionCallbacks_;
+ LoopCallbackList runAfterDrainCallbacks_;
// This will be null most of the time, but point to currentCallbacks
// if we are in the middle of running loop callbacks, such that
// stop_ is set by terminateLoopSoon() and is used by the main loop
// to determine if it should exit
- bool stop_;
+ std::atomic<bool> stop_;
// The ID of the thread running the main loop.
// 0 if loop is not running.
// A notification queue for runInEventBaseThread() to use
// to send function requests to the EventBase thread.
- std::unique_ptr<NotificationQueue<std::pair<void (*)(void*), void*>>> queue_;
+ std::unique_ptr<NotificationQueue<Func>> queue_;
std::unique_ptr<FunctionRunner> fnRunner_;
+ size_t loopKeepAliveCount_{0};
+ bool loopKeepAliveActive_{false};
// limit for latency in microseconds (0 disables)
int64_t maxLatency_;
SmoothLoopTime maxLatencyLoopTime_;
// callback called when latency limit is exceeded
- Cob maxLatencyCob_;
+ Func maxLatencyCob_;
+
+ // Enables/disables time measurements in loopBody(). if disabled, the
+ // following functionality that relies on time-measurement, will not
+ // be supported: avg loop time, observer and max latency.
+ const bool enableTimeMeasurement_;
// we'll wait this long before running deferred callbacks if the event
// loop is idle.
uint64_t nextLoopCnt_;
uint64_t latestLoopCnt_;
uint64_t startWork_;
+ // Prevent undefined behavior from invoking event_base_loop() reentrantly.
+ // This is needed since many projects use libevent-1.4, which lacks commit
+ // b557b175c00dc462c1fce25f6e7dd67121d2c001 from
+ // https://github.com/libevent/libevent/.
+ bool invokingLoop_{false};
// Observer to export counters
std::shared_ptr<EventBaseObserver> observer_;
uint32_t observerSampleCount_;
+ // EventHandler's execution observer.
+ ExecutionObserver* executionObserver_;
+
// Name of the thread running this EventBase
std::string name_;
+
+ // allow runOnDestruction() to be called from any threads
+ std::mutex onDestructionCallbacksMutex_;
+
+ // allow runAfterDrain() to be called from any threads
+ std::mutex runAfterDrainCallbacksMutex_;
+
+ // see EventBaseLocal
+ friend class detail::EventBaseLocalBase;
+ template <typename T> friend class EventBaseLocal;
+ std::mutex localStorageMutex_;
+ std::unordered_map<uint64_t, std::shared_ptr<void>> localStorage_;
+ std::unordered_set<detail::EventBaseLocalBaseBase*> localStorageToDtor_;
};
+template <typename T>
+bool EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) {
+ return runInEventBaseThread([=] { fn(arg); });
+}
+
+template <typename T>
+bool EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
+ return runInEventBaseThreadAndWait([=] { fn(arg); });
+}
+
+template <typename T>
+bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(
+ void (*fn)(T*),
+ T* arg) {
+ return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
+}
+
} // folly