X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;ds=inline;f=folly%2Fio%2Fasync%2FEventBase.h;h=50d2aef8cd240bff0558be48c1ce7229ef2b1fa4;hb=cacdd1d54a32c2740fa9dda4ac83b11d2d398820;hp=ec4a6ad653b62e681a0531c0085afa5ed9c76c9c;hpb=c2132bba2689552bfd6e07eb70f36bc91ddb43ef;p=folly.git diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index ec4a6ad6..50d2aef8 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2017-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,38 +13,61 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #pragma once -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include #include +#include +#include #include -#include #include +#include +#include +#include #include + #include #include -#include -#include // libevent -#include -#include -#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace folly { -typedef std::function Cob; +using Cob = Func; // defined in folly/Executor.h template class NotificationQueue; +namespace detail { +class EventBaseLocalBase; + +class EventBaseLocalBaseBase { + public: + virtual void onEventBaseDestruction(EventBase& evb) = 0; + virtual ~EventBaseLocalBaseBase() = default; +}; +} // namespace detail +template +class EventBaseLocal; + class EventBaseObserver { public: - virtual ~EventBaseObserver() {} + virtual ~EventBaseObserver() = default; virtual uint32_t getSampleRate() const = 0; @@ -71,12 +94,18 @@ class RequestEventBase : public RequestData { std::unique_ptr(new RequestEventBase(eb))); } + bool hasCallback() override { + return false; + } + private: explicit RequestEventBase(EventBase* eb) : eb_(eb) {} EventBase* eb_; static constexpr const char* kContextDataName{"EventBase"}; }; +class VirtualEventBase; + /** * This class is a wrapper for all asynchronous I/O processing functionality * @@ -95,10 +124,12 @@ class RequestEventBase : public RequestData { * EventBase from other threads. When it is safe to call a method from * another thread it is explicitly listed in the method comments. */ -class EventBase : - private boost::noncopyable, public TimeoutManager, public wangle::Executor -{ +class EventBase : private boost::noncopyable, + public TimeoutManager, + public DrivableExecutor { public: + using Func = folly::Function; + /** * A callback interface to use with runInLoop() * @@ -110,40 +141,80 @@ class EventBase : * If a LoopCallback object is destroyed while it is scheduled to be run in * the next loop iteration, it will automatically be cancelled. */ - class LoopCallback { + class LoopCallback + : public boost::intrusive::list_base_hook< + boost::intrusive::link_mode> { public: - virtual ~LoopCallback() {} + virtual ~LoopCallback() = default; virtual void runLoopCallback() noexcept = 0; void cancelLoopCallback() { - hook_.unlink(); + context_.reset(); + unlink(); } bool isLoopCallbackScheduled() const { - return hook_.is_linked(); + return is_linked(); } private: - typedef boost::intrusive::list_member_hook< - boost::intrusive::link_mode > ListHook; - - ListHook hook_; - typedef boost::intrusive::list< LoopCallback, - boost::intrusive::member_hook, boost::intrusive::constant_time_size > List; // EventBase needs access to LoopCallbackList (and therefore to hook_) friend class EventBase; + friend class VirtualEventBase; std::shared_ptr context_; }; + class FunctionLoopCallback : public LoopCallback { + public: + explicit FunctionLoopCallback(Func&& function) + : function_(std::move(function)) {} + + void runLoopCallback() noexcept override { + function_(); + delete this; + } + + private: + Func function_; + }; + + // Like FunctionLoopCallback, but saves one allocation. Use with caution. + // + // The caller is responsible for maintaining the lifetime of this callback + // until after the point at which the contained function is called. + class StackFunctionLoopCallback : public LoopCallback { + public: + explicit StackFunctionLoopCallback(Func&& function) + : function_(std::move(function)) {} + void runLoopCallback() noexcept override { + Func(std::move(function_))(); + } + + private: + Func function_; + }; + /** * Create a new EventBase object. + * + * Same as EventBase(true), which constructs an EventBase that measures time. */ - EventBase(); + EventBase() : EventBase(true) {} + + /** + * Create a new EventBase object. + * + * @param enableTimeMeasurement Informs whether this event base should measure + * time. Disabling it would likely improve + * performance, but will disable some features + * that relies on time-measurement, including: + * observer, max latency and avg loop time. + */ + explicit EventBase(bool enableTimeMeasurement); /** * Create a new EventBase object that will use the specified libevent @@ -151,9 +222,15 @@ class EventBase : * * The EventBase will take ownership of this event_base, and will call * event_base_free(evb) when the EventBase is destroyed. + * + * @param enableTimeMeasurement Informs whether this event base should measure + * time. Disabling it would likely improve + * performance, but will disable some features + * that relies on time-measurement, including: + * observer, max latency and avg loop time. */ - explicit EventBase(event_base* evb); - ~EventBase(); + explicit EventBase(event_base* evb, bool enableTimeMeasurement = true); + ~EventBase() override; /** * Runs the event loop. @@ -258,13 +335,12 @@ class EventBase : void runInLoop(LoopCallback* callback, bool thisIteration = false); /** - * Convenience function to call runInLoop() with a std::function. + * Convenience function to call runInLoop() with a folly::Function. * - * This creates a LoopCallback object to wrap the std::function, and invoke - * the std::function when the loop callback fires. This is slightly more + * This creates a LoopCallback object to wrap the folly::Function, and invoke + * the folly::Function when the loop callback fires. This is slightly more * expensive than defining your own LoopCallback, but more convenient in - * areas that aren't performance sensitive where you just want to use - * std::bind. (std::bind is fairly slow on even by itself.) + * areas that aren't too performance sensitive. * * This method may only be called from the EventBase's thread. This * essentially allows an event handler to schedule an additional callback to @@ -272,9 +348,7 @@ class EventBase : * * Use runInEventBaseThread() to schedule functions from another thread. */ - void runInLoop(const Cob& c, bool thisIteration = false); - - void runInLoop(Cob&& c, bool thisIteration = false); + void runInLoop(Func c, bool thisIteration = false); /** * Adds the given callback to a queue of things run before destruction @@ -321,21 +395,17 @@ class EventBase : * @return Returns true if the function was successfully scheduled, or false * if there was an error scheduling the function. */ - template - bool runInEventBaseThread(void (*fn)(T*), T* arg) { - return runInEventBaseThread(reinterpret_cast(fn), - reinterpret_cast(arg)); - } - - bool runInEventBaseThread(void (*fn)(void*), void* arg); + template + bool runInEventBaseThread(void (*fn)(T*), T* arg); /** * Run the specified function in the EventBase's thread * - * This version of runInEventBaseThread() takes a std::function object. - * Note that this is less efficient than the version that takes a plain - * function pointer and void* argument, as it has to allocate memory to copy - * the std::function object. + * This version of runInEventBaseThread() takes a folly::Function object. + * Note that this may be less efficient than the version that takes a plain + * function pointer and void* argument, if moving the function is expensive + * (e.g., if it wraps a lambda which captures some values with expensive move + * constructors). * * If the loop is terminated (and never later restarted) before it has a * chance to run the requested function, the function will be run upon the @@ -343,33 +413,50 @@ class EventBase : * * The function must not throw any exceptions. */ - bool runInEventBaseThread(const Cob& fn); + bool runInEventBaseThread(Func fn); - /** - * Runs the given Cob at some time after the specified number of - * milliseconds. (No guarantees exactly when.) - * - * @return true iff the cob was successfully registered. + /* + * Like runInEventBaseThread, but the caller waits for the callback to be + * executed. + */ + template + bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg); + + /* + * Like runInEventBaseThread, but the caller waits for the callback to be + * executed. + */ + bool runInEventBaseThreadAndWait(Func fn); + + /* + * Like runInEventBaseThreadAndWait, except if the caller is already in the + * event base thread, the functor is simply run inline. */ - bool runAfterDelay( - const Cob& c, - int milliseconds, - TimeoutManager::InternalEnum = TimeoutManager::InternalEnum::NORMAL); + template + bool runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), T* arg); + + /* + * Like runInEventBaseThreadAndWait, except if the caller is already in the + * event base thread, the functor is simply run inline. + */ + bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn); /** * Set the maximum desired latency in us and provide a callback which will be * called when that latency is exceeded. + * OBS: This functionality depends on time-measurement. */ - void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) { + void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) { + assert(enableTimeMeasurement_); maxLatency_ = maxLatency; - maxLatencyCob_ = maxLatencyCob; + maxLatencyCob_ = std::move(maxLatencyCob); } /** * Set smoothing coefficient for loop load average; # of milliseconds * for exp(-1) (1/2.71828...) decay. */ - void setLoadAvgMsec(uint32_t ms); + void setLoadAvgMsec(std::chrono::milliseconds ms); /** * reset the load average to a desired value @@ -380,6 +467,7 @@ class EventBase : * Get the average loop time in microseconds (an exponentially-smoothed ave) */ double getAvgLoopTime() const { + assert(enableTimeMeasurement_); return avgLoopTime_.get(); } @@ -387,7 +475,7 @@ class EventBase : * check if the event base loop is running. */ bool isRunning() const { - return loopThread_.load(std::memory_order_relaxed) != 0; + return loopThread_.load(std::memory_order_relaxed) != std::thread::id(); } /** @@ -395,7 +483,7 @@ class EventBase : */ void waitUntilRunning(); - int getNotificationQueueSize() const; + size_t getNotificationQueueSize() const; void setMaxReadAtOnce(uint32_t maxAtOnce); @@ -405,12 +493,31 @@ class EventBase : */ bool isInEventBaseThread() const { auto tid = loopThread_.load(std::memory_order_relaxed); - return tid == 0 || pthread_equal(tid, pthread_self()); + return tid == std::thread::id() || tid == std::this_thread::get_id(); } bool inRunningEventBaseThread() const { - return pthread_equal( - loopThread_.load(std::memory_order_relaxed), pthread_self()); + return loopThread_.load(std::memory_order_relaxed) == + std::this_thread::get_id(); + } + + /** + * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for + * dcheckIsInEventBaseThread), but it prints more information on + * failure. + */ + void checkIsInEventBaseThread() const; + void dcheckIsInEventBaseThread() const { + if (kIsDebug) { + checkIsInEventBaseThread(); + } + } + + HHWheelTimer& timer() { + if (!wheelTimer_) { + wheelTimer_ = HHWheelTimer::newTimer(this); + } + return *wheelTimer_.get(); } // --------- interface to underlying libevent base ------------ @@ -429,21 +536,23 @@ class EventBase : * first handler fired within that cycle. * */ - bool bumpHandlingTime(); + void bumpHandlingTime() final; class SmoothLoopTime { public: - explicit SmoothLoopTime(uint64_t timeInterval) - : expCoeff_(-1.0/timeInterval) - , value_(0.0) - , oldBusyLeftover_(0) { + explicit SmoothLoopTime(std::chrono::microseconds timeInterval) + : expCoeff_(-1.0 / timeInterval.count()), + value_(0.0), + oldBusyLeftover_(0) { VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; } - void setTimeInterval(uint64_t timeInterval); + void setTimeInterval(std::chrono::microseconds timeInterval); void reset(double value = 0.0); - void addSample(int64_t idle, int64_t busy); + void addSample( + std::chrono::microseconds idle, + std::chrono::microseconds busy); double get() const { return value_; @@ -454,13 +563,13 @@ class EventBase : } private: - double expCoeff_; - double value_; - int64_t oldBusyLeftover_; + double expCoeff_; + double value_; + std::chrono::microseconds oldBusyLeftover_; }; - void setObserver( - const std::shared_ptr& observer) { + void setObserver(const std::shared_ptr& observer) { + assert(enableTimeMeasurement_); observer_ = observer; } @@ -468,6 +577,23 @@ class EventBase : return observer_; } + /** + * Setup execution observation/instrumentation for every EventHandler + * executed in this EventBase. + * + * @param executionObserver EventHandle's execution observer. + */ + void setExecutionObserver(ExecutionObserver* observer) { + executionObserver_ = observer; + } + + /** + * Gets the execution observer associated with this EventBase. + */ + ExecutionObserver* getExecutionObserver() { + return executionObserver_; + } + /** * Set the name of the thread that runs this event base. */ @@ -478,73 +604,81 @@ class EventBase : */ const std::string& getName(); - /// Implements the wangle::Executor interface + /// Implements the Executor interface void add(Cob fn) override { // runInEventBaseThread() takes a const&, // so no point in doing std::move here. - runInEventBaseThread(fn); + runInEventBaseThread(std::move(fn)); } - private: + /// Implements the DrivableExecutor interface + void drive() override { + ++loopKeepAliveCount_; + SCOPE_EXIT { + --loopKeepAliveCount_; + }; + loopOnce(); + } + + /// Returns you a handle which make loop() behave like loopForever() until + /// destroyed. loop() will return to its original behavior only when all + /// loop keep-alives are released. + KeepAlive getKeepAliveToken() override { + keepAliveAcquire(); + return makeKeepAlive(); + } // TimeoutManager - void attachTimeoutManager(AsyncTimeout* obj, - TimeoutManager::InternalEnum internal); + void attachTimeoutManager( + AsyncTimeout* obj, + TimeoutManager::InternalEnum internal) final; - void detachTimeoutManager(AsyncTimeout* obj); + void detachTimeoutManager(AsyncTimeout* obj) final; - bool scheduleTimeout(AsyncTimeout* obj, std::chrono::milliseconds timeout); + bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout) + final; - void cancelTimeout(AsyncTimeout* obj); + void cancelTimeout(AsyncTimeout* obj) final; - bool isInTimeoutManagerThread() { + bool isInTimeoutManagerThread() final { return isInEventBaseThread(); } - // Helper class used to short circuit runInEventBaseThread - class RunInLoopCallback : public LoopCallback { - public: - RunInLoopCallback(void (*fn)(void*), void* arg); - void runLoopCallback() noexcept; + // Returns a VirtualEventBase attached to this EventBase. Can be used to + // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be + // destroyed together with the EventBase. + // + // Any number of VirtualEventBases instances may be independently constructed, + // which are backed by this EventBase. This method should be only used if you + // don't need to manage the life time of the VirtualEventBase used. + folly::VirtualEventBase& getVirtualEventBase(); + + protected: + void keepAliveAcquire() override { + if (inRunningEventBaseThread()) { + loopKeepAliveCount_++; + } else { + loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed); + } + } - private: - void (*fn_)(void*); - void* arg_; - }; + void keepAliveRelease() override { + if (!inRunningEventBaseThread()) { + return add([=] { keepAliveRelease(); }); + } + loopKeepAliveCount_--; + } + + private: + void applyLoopKeepAlive(); + + ssize_t loopKeepAliveCount(); /* * Helper function that tells us whether we have already handled * some event/timeout/callback in this loop iteration. */ - bool nothingHandledYet(); - - // --------- libevent callbacks (not for client use) ------------ - - static void runFunctionPtr(std::function* fn); - - // small object used as a callback arg with enough info to execute the - // appropriate client-provided Cob - class CobTimeout : public AsyncTimeout { - public: - CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in) - : AsyncTimeout(b, in), cob_(c) {} - - virtual void timeoutExpired() noexcept; - - private: - Cob cob_; - - public: - typedef boost::intrusive::list_member_hook< - boost::intrusive::link_mode > ListHook; - - ListHook hook; - - typedef boost::intrusive::list< - CobTimeout, - boost::intrusive::member_hook, - boost::intrusive::constant_time_size > List; - }; + bool nothingHandledYet() const noexcept; typedef LoopCallback::List LoopCallbackList; class FunctionRunner; @@ -552,11 +686,12 @@ class EventBase : bool loopBody(int flags = 0); // executes any callbacks queued by runInLoop(); returns false if none found - bool runLoopCallbacks(bool setContext = true); + bool runLoopCallbacks(); void initNotificationQueue(); - CobTimeout::List pendingCobTimeouts_; + // should only be accessed through public getter + HHWheelTimer::UniquePtr wheelTimer_; LoopCallbackList loopCallbacks_; LoopCallbackList runBeforeLoopCallbacks_; @@ -570,25 +705,25 @@ class EventBase : // stop_ is set by terminateLoopSoon() and is used by the main loop // to determine if it should exit - bool stop_; + std::atomic stop_; // The ID of the thread running the main loop. - // 0 if loop is not running. - // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or - // even that atomic is valid), but that's how it is - // everywhere (at least on Linux, FreeBSD, and OSX). - std::atomic loopThread_; + // std::thread::id{} if loop is not running. + std::atomic loopThread_; // pointer to underlying event_base class doing the heavy lifting event_base* evb_; // A notification queue for runInEventBaseThread() to use // to send function requests to the EventBase thread. - std::unique_ptr>> queue_; + std::unique_ptr> queue_; std::unique_ptr fnRunner_; + ssize_t loopKeepAliveCount_{0}; + std::atomic loopKeepAliveCountAtomic_{0}; + bool loopKeepAliveActive_{false}; // limit for latency in microseconds (0 disables) - int64_t maxLatency_; + std::chrono::microseconds maxLatency_; // exponentially-smoothed average loop time for latency-limiting SmoothLoopTime avgLoopTime_; @@ -599,23 +734,61 @@ class EventBase : SmoothLoopTime maxLatencyLoopTime_; // callback called when latency limit is exceeded - Cob maxLatencyCob_; + Func maxLatencyCob_; - // we'll wait this long before running deferred callbacks if the event - // loop is idle. - static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms + // Enables/disables time measurements in loopBody(). if disabled, the + // following functionality that relies on time-measurement, will not + // be supported: avg loop time, observer and max latency. + const bool enableTimeMeasurement_; // Wrap-around loop counter to detect beginning of each loop uint64_t nextLoopCnt_; uint64_t latestLoopCnt_; - uint64_t startWork_; + std::chrono::steady_clock::time_point startWork_; + // Prevent undefined behavior from invoking event_base_loop() reentrantly. + // This is needed since many projects use libevent-1.4, which lacks commit + // b557b175c00dc462c1fce25f6e7dd67121d2c001 from + // https://github.com/libevent/libevent/. + bool invokingLoop_{false}; // Observer to export counters std::shared_ptr observer_; uint32_t observerSampleCount_; + // EventHandler's execution observer. + ExecutionObserver* executionObserver_; + // Name of the thread running this EventBase std::string name_; + + // allow runOnDestruction() to be called from any threads + std::mutex onDestructionCallbacksMutex_; + + // see EventBaseLocal + friend class detail::EventBaseLocalBase; + template friend class EventBaseLocal; + std::unordered_map> localStorage_; + std::unordered_set localStorageToDtor_; + + folly::once_flag virtualEventBaseInitFlag_; + std::unique_ptr virtualEventBase_; }; -} // folly +template +bool EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) { + return runInEventBaseThread([=] { fn(arg); }); +} + +template +bool EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) { + return runInEventBaseThreadAndWait([=] { fn(arg); }); +} + +template +bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait( + void (*fn)(T*), + T* arg) { + return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); }); +} + +} // namespace folly