X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2FEventBase.h;h=3399cea2dacbdcc57af56f3318efd1837b65583e;hp=47697fde34835612be6e6f6855bebebd0915abcd;hb=bda67fde120837b77ddab74f23abcb22ae5b3029;hpb=621cc26e0a5033f3bddf8c70c65bc0096b0e099b diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index 47697fde..3399cea2 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,35 +16,41 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include #include +#include +#include #include -#include #include -#include +#include #include -#include +#include #include + #include #include -#include -#include // libevent -#include -#include -#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace folly { -typedef std::function Cob; +using Cob = Func; // defined in folly/Executor.h template class NotificationQueue; @@ -56,7 +62,7 @@ class EventBaseLocalBaseBase { virtual void onEventBaseDestruction(EventBase& evb) = 0; virtual ~EventBaseLocalBaseBase() = default; }; -} +} // namespace detail template class EventBaseLocal; @@ -89,12 +95,18 @@ class RequestEventBase : public RequestData { std::unique_ptr(new RequestEventBase(eb))); } + bool hasCallback() override { + return false; + } + private: explicit RequestEventBase(EventBase* eb) : eb_(eb) {} EventBase* eb_; static constexpr const char* kContextDataName{"EventBase"}; }; +class VirtualEventBase; + /** * This class is a wrapper for all asynchronous I/O processing functionality * @@ -117,6 +129,8 @@ class EventBase : private boost::noncopyable, public TimeoutManager, public DrivableExecutor { public: + using Func = folly::Function; + /** * A callback interface to use with runInLoop() * @@ -128,36 +142,63 @@ class EventBase : private boost::noncopyable, * If a LoopCallback object is destroyed while it is scheduled to be run in * the next loop iteration, it will automatically be cancelled. */ - class LoopCallback { + class LoopCallback + : public boost::intrusive::list_base_hook< + boost::intrusive::link_mode> { public: virtual ~LoopCallback() = default; virtual void runLoopCallback() noexcept = 0; void cancelLoopCallback() { - hook_.unlink(); + context_.reset(); + unlink(); } bool isLoopCallbackScheduled() const { - return hook_.is_linked(); + return is_linked(); } private: - typedef boost::intrusive::list_member_hook< - boost::intrusive::link_mode > ListHook; - - ListHook hook_; - typedef boost::intrusive::list< LoopCallback, - boost::intrusive::member_hook, boost::intrusive::constant_time_size > List; // EventBase needs access to LoopCallbackList (and therefore to hook_) friend class EventBase; + friend class VirtualEventBase; std::shared_ptr context_; }; + class FunctionLoopCallback : public LoopCallback { + public: + explicit FunctionLoopCallback(Func&& function) + : function_(std::move(function)) {} + + void runLoopCallback() noexcept override { + function_(); + delete this; + } + + private: + Func function_; + }; + + // Like FunctionLoopCallback, but saves one allocation. Use with caution. + // + // The caller is responsible for maintaining the lifetime of this callback + // until after the point at which the contained function is called. + class StackFunctionLoopCallback : public LoopCallback { + public: + explicit StackFunctionLoopCallback(Func&& function) + : function_(std::move(function)) {} + void runLoopCallback() noexcept override { + Func(std::move(function_))(); + } + + private: + Func function_; + }; + /** * Create a new EventBase object. * @@ -183,7 +224,7 @@ class EventBase : private boost::noncopyable, * observer, max latency and avg loop time. */ explicit EventBase(event_base* evb, bool enableTimeMeasurement = true); - ~EventBase(); + ~EventBase() override; /** * Runs the event loop. @@ -288,13 +329,12 @@ class EventBase : private boost::noncopyable, void runInLoop(LoopCallback* callback, bool thisIteration = false); /** - * Convenience function to call runInLoop() with a std::function. + * Convenience function to call runInLoop() with a folly::Function. * - * This creates a LoopCallback object to wrap the std::function, and invoke - * the std::function when the loop callback fires. This is slightly more + * This creates a LoopCallback object to wrap the folly::Function, and invoke + * the folly::Function when the loop callback fires. This is slightly more * expensive than defining your own LoopCallback, but more convenient in - * areas that aren't performance sensitive where you just want to use - * std::bind. (std::bind is fairly slow on even by itself.) + * areas that aren't too performance sensitive. * * This method may only be called from the EventBase's thread. This * essentially allows an event handler to schedule an additional callback to @@ -302,9 +342,7 @@ class EventBase : private boost::noncopyable, * * Use runInEventBaseThread() to schedule functions from another thread. */ - void runInLoop(const Cob& c, bool thisIteration = false); - - void runInLoop(Cob&& c, bool thisIteration = false); + void runInLoop(Func c, bool thisIteration = false); /** * Adds the given callback to a queue of things run before destruction @@ -318,15 +356,6 @@ class EventBase : private boost::noncopyable, */ void runOnDestruction(LoopCallback* callback); - /** - * Adds the given callback to a queue of things run after the notification - * queue is drained before the destruction of current EventBase. - * - * Note: will be called from the thread that invoked EventBase destructor, - * after the final run of loop callbacks. - */ - void runAfterDrain(Cob&& cob); - /** * Adds a callback that will run immediately *before* the event loop. * This is very similar to runInLoop(), but will not cause the loop to break: @@ -360,21 +389,17 @@ class EventBase : private boost::noncopyable, * @return Returns true if the function was successfully scheduled, or false * if there was an error scheduling the function. */ - template - bool runInEventBaseThread(void (*fn)(T*), T* arg) { - return runInEventBaseThread(reinterpret_cast(fn), - reinterpret_cast(arg)); - } - - bool runInEventBaseThread(void (*fn)(void*), void* arg); + template + bool runInEventBaseThread(void (*fn)(T*), T* arg); /** * Run the specified function in the EventBase's thread * - * This version of runInEventBaseThread() takes a std::function object. - * Note that this is less efficient than the version that takes a plain - * function pointer and void* argument, as it has to allocate memory to copy - * the std::function object. + * This version of runInEventBaseThread() takes a folly::Function object. + * Note that this may be less efficient than the version that takes a plain + * function pointer and void* argument, if moving the function is expensive + * (e.g., if it wraps a lambda which captures some values with expensive move + * constructors). * * If the loop is terminated (and never later restarted) before it has a * chance to run the requested function, the function will be run upon the @@ -382,96 +407,50 @@ class EventBase : private boost::noncopyable, * * The function must not throw any exceptions. */ - bool runInEventBaseThread(const Cob& fn); - - /* - * Like runInEventBaseThread, but the caller waits for the callback to be - * executed. - */ - template - bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) { - return runInEventBaseThreadAndWait(reinterpret_cast(fn), - reinterpret_cast(arg)); - } + bool runInEventBaseThread(Func fn); /* * Like runInEventBaseThread, but the caller waits for the callback to be * executed. */ - bool runInEventBaseThreadAndWait(void (*fn)(void*), void* arg) { - return runInEventBaseThreadAndWait(std::bind(fn, arg)); - } + template + bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg); /* * Like runInEventBaseThread, but the caller waits for the callback to be * executed. */ - bool runInEventBaseThreadAndWait(const Cob& fn); - - /* - * Like runInEventBaseThreadAndWait, except if the caller is already in the - * event base thread, the functor is simply run inline. - */ - template - bool runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), T* arg) { - return runImmediatelyOrRunInEventBaseThreadAndWait( - reinterpret_cast(fn), reinterpret_cast(arg)); - } + bool runInEventBaseThreadAndWait(Func fn); /* * Like runInEventBaseThreadAndWait, except if the caller is already in the * event base thread, the functor is simply run inline. */ - bool runImmediatelyOrRunInEventBaseThreadAndWait( - void (*fn)(void*), void* arg) { - return runImmediatelyOrRunInEventBaseThreadAndWait(std::bind(fn, arg)); - } + template + bool runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), T* arg); /* * Like runInEventBaseThreadAndWait, except if the caller is already in the * event base thread, the functor is simply run inline. */ - bool runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn); - - /** - * Runs the given Cob at some time after the specified number of - * milliseconds. (No guarantees exactly when.) - * - * Throws a std::system_error if an error occurs. - */ - void runAfterDelay( - const Cob& c, - int milliseconds, - TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL); - - /** - * @see tryRunAfterDelay for more details - * - * @return true iff the cob was successfully registered. - * - * */ - bool tryRunAfterDelay( - const Cob& cob, - int milliseconds, - TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL); + bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn); /** * Set the maximum desired latency in us and provide a callback which will be * called when that latency is exceeded. * OBS: This functionality depends on time-measurement. */ - void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) { + void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) { assert(enableTimeMeasurement_); maxLatency_ = maxLatency; - maxLatencyCob_ = maxLatencyCob; + maxLatencyCob_ = std::move(maxLatencyCob); } - /** * Set smoothing coefficient for loop load average; # of milliseconds * for exp(-1) (1/2.71828...) decay. */ - void setLoadAvgMsec(uint32_t ms); + void setLoadAvgMsec(std::chrono::milliseconds ms); /** * reset the load average to a desired value @@ -490,7 +469,7 @@ class EventBase : private boost::noncopyable, * check if the event base loop is running. */ bool isRunning() const { - return loopThread_.load(std::memory_order_relaxed) != 0; + return loopThread_.load(std::memory_order_relaxed) != std::thread::id(); } /** @@ -498,7 +477,7 @@ class EventBase : private boost::noncopyable, */ void waitUntilRunning(); - int getNotificationQueueSize() const; + size_t getNotificationQueueSize() const; void setMaxReadAtOnce(uint32_t maxAtOnce); @@ -508,12 +487,31 @@ class EventBase : private boost::noncopyable, */ bool isInEventBaseThread() const { auto tid = loopThread_.load(std::memory_order_relaxed); - return tid == 0 || pthread_equal(tid, pthread_self()); + return tid == std::thread::id() || tid == std::this_thread::get_id(); } bool inRunningEventBaseThread() const { - return pthread_equal( - loopThread_.load(std::memory_order_relaxed), pthread_self()); + return loopThread_.load(std::memory_order_relaxed) == + std::this_thread::get_id(); + } + + /** + * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for + * dcheckIsInEventBaseThread), but it prints more information on + * failure. + */ + void checkIsInEventBaseThread() const; + void dcheckIsInEventBaseThread() const { + if (kIsDebug) { + checkIsInEventBaseThread(); + } + } + + HHWheelTimer& timer() { + if (!wheelTimer_) { + wheelTimer_ = HHWheelTimer::newTimer(this); + } + return *wheelTimer_.get(); } // --------- interface to underlying libevent base ------------ @@ -532,21 +530,23 @@ class EventBase : private boost::noncopyable, * first handler fired within that cycle. * */ - bool bumpHandlingTime() override; + void bumpHandlingTime() final; class SmoothLoopTime { public: - explicit SmoothLoopTime(uint64_t timeInterval) - : expCoeff_(-1.0/timeInterval) - , value_(0.0) - , oldBusyLeftover_(0) { + explicit SmoothLoopTime(std::chrono::microseconds timeInterval) + : expCoeff_(-1.0 / timeInterval.count()), + value_(0.0), + oldBusyLeftover_(0) { VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; } - void setTimeInterval(uint64_t timeInterval); + void setTimeInterval(std::chrono::microseconds timeInterval); void reset(double value = 0.0); - void addSample(int64_t idle, int64_t busy); + void addSample( + std::chrono::microseconds idle, + std::chrono::microseconds busy); double get() const { return value_; @@ -557,9 +557,9 @@ class EventBase : private boost::noncopyable, } private: - double expCoeff_; - double value_; - int64_t oldBusyLeftover_; + double expCoeff_; + double value_; + std::chrono::microseconds oldBusyLeftover_; }; void setObserver(const std::shared_ptr& observer) { @@ -602,65 +602,77 @@ class EventBase : private boost::noncopyable, void add(Cob fn) override { // runInEventBaseThread() takes a const&, // so no point in doing std::move here. - runInEventBaseThread(fn); + runInEventBaseThread(std::move(fn)); } /// Implements the DrivableExecutor interface void drive() override { + ++loopKeepAliveCount_; + SCOPE_EXIT { + --loopKeepAliveCount_; + }; loopOnce(); } - private: + /// Returns you a handle which make loop() behave like loopForever() until + /// destroyed. loop() will return to its original behavior only when all + /// loop keep-alives are released. + KeepAlive getKeepAliveToken() override { + keepAliveAcquire(); + return makeKeepAlive(); + } // TimeoutManager - void attachTimeoutManager(AsyncTimeout* obj, - TimeoutManager::InternalEnum internal) override; + void attachTimeoutManager( + AsyncTimeout* obj, + TimeoutManager::InternalEnum internal) final; - void detachTimeoutManager(AsyncTimeout* obj) override; + void detachTimeoutManager(AsyncTimeout* obj) final; bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout) - override; + final; - void cancelTimeout(AsyncTimeout* obj) override; + void cancelTimeout(AsyncTimeout* obj) final; - bool isInTimeoutManagerThread() override { + bool isInTimeoutManagerThread() final { return isInEventBaseThread(); } - // Helper class used to short circuit runInEventBaseThread - class RunInLoopCallback : public LoopCallback { - public: - RunInLoopCallback(void (*fn)(void*), void* arg); - void runLoopCallback() noexcept; + // Returns a VirtualEventBase attached to this EventBase. Can be used to + // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be + // destroyed together with the EventBase. + // + // Any number of VirtualEventBases instances may be independently constructed, + // which are backed by this EventBase. This method should be only used if you + // don't need to manage the life time of the VirtualEventBase used. + folly::VirtualEventBase& getVirtualEventBase(); + + protected: + void keepAliveAcquire() override { + if (inRunningEventBaseThread()) { + loopKeepAliveCount_++; + } else { + loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed); + } + } - private: - void (*fn_)(void*); - void* arg_; - }; + void keepAliveRelease() override { + if (!inRunningEventBaseThread()) { + return add([=] { keepAliveRelease(); }); + } + loopKeepAliveCount_--; + } + + private: + void applyLoopKeepAlive(); + + ssize_t loopKeepAliveCount(); /* * Helper function that tells us whether we have already handled * some event/timeout/callback in this loop iteration. */ - bool nothingHandledYet(); - - // --------- libevent callbacks (not for client use) ------------ - - static void runFunctionPtr(std::function* fn); - - // small object used as a callback arg with enough info to execute the - // appropriate client-provided Cob - class CobTimeout : public HHWheelTimer::Callback { - public: - explicit CobTimeout(const Cob& c) : cob_(c) {} - - void timeoutExpired() noexcept override; - - void callbackCanceled() noexcept override; - - private: - Cob cob_; - }; + bool nothingHandledYet() const noexcept; typedef LoopCallback::List LoopCallbackList; class FunctionRunner; @@ -668,16 +680,16 @@ class EventBase : private boost::noncopyable, bool loopBody(int flags = 0); // executes any callbacks queued by runInLoop(); returns false if none found - bool runLoopCallbacks(bool setContext = true); + bool runLoopCallbacks(); void initNotificationQueue(); + // should only be accessed through public getter HHWheelTimer::UniquePtr wheelTimer_; LoopCallbackList loopCallbacks_; LoopCallbackList runBeforeLoopCallbacks_; LoopCallbackList onDestructionCallbacks_; - LoopCallbackList runAfterDrainCallbacks_; // This will be null most of the time, but point to currentCallbacks // if we are in the middle of running loop callbacks, such that @@ -690,22 +702,22 @@ class EventBase : private boost::noncopyable, std::atomic stop_; // The ID of the thread running the main loop. - // 0 if loop is not running. - // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or - // even that atomic is valid), but that's how it is - // everywhere (at least on Linux, FreeBSD, and OSX). - std::atomic loopThread_; + // std::thread::id{} if loop is not running. + std::atomic loopThread_; // pointer to underlying event_base class doing the heavy lifting event_base* evb_; // A notification queue for runInEventBaseThread() to use // to send function requests to the EventBase thread. - std::unique_ptr>> queue_; + std::unique_ptr> queue_; std::unique_ptr fnRunner_; + ssize_t loopKeepAliveCount_{0}; + std::atomic loopKeepAliveCountAtomic_{0}; + bool loopKeepAliveActive_{false}; // limit for latency in microseconds (0 disables) - int64_t maxLatency_; + std::chrono::microseconds maxLatency_; // exponentially-smoothed average loop time for latency-limiting SmoothLoopTime avgLoopTime_; @@ -716,21 +728,22 @@ class EventBase : private boost::noncopyable, SmoothLoopTime maxLatencyLoopTime_; // callback called when latency limit is exceeded - Cob maxLatencyCob_; + Func maxLatencyCob_; // Enables/disables time measurements in loopBody(). if disabled, the // following functionality that relies on time-measurement, will not // be supported: avg loop time, observer and max latency. const bool enableTimeMeasurement_; - // we'll wait this long before running deferred callbacks if the event - // loop is idle. - static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms - // Wrap-around loop counter to detect beginning of each loop uint64_t nextLoopCnt_; uint64_t latestLoopCnt_; - uint64_t startWork_; + std::chrono::steady_clock::time_point startWork_; + // Prevent undefined behavior from invoking event_base_loop() reentrantly. + // This is needed since many projects use libevent-1.4, which lacks commit + // b557b175c00dc462c1fce25f6e7dd67121d2c001 from + // https://github.com/libevent/libevent/. + bool invokingLoop_{false}; // Observer to export counters std::shared_ptr observer_; @@ -745,15 +758,31 @@ class EventBase : private boost::noncopyable, // allow runOnDestruction() to be called from any threads std::mutex onDestructionCallbacksMutex_; - // allow runAfterDrain() to be called from any threads - std::mutex runAfterDrainCallbacksMutex_; - // see EventBaseLocal friend class detail::EventBaseLocalBase; template friend class EventBaseLocal; - std::mutex localStorageMutex_; std::unordered_map> localStorage_; std::unordered_set localStorageToDtor_; + + folly::once_flag virtualEventBaseInitFlag_; + std::unique_ptr virtualEventBase_; }; -} // folly +template +bool EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) { + return runInEventBaseThread([=] { fn(arg); }); +} + +template +bool EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) { + return runInEventBaseThreadAndWait([=] { fn(arg); }); +} + +template +bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait( + void (*fn)(T*), + T* arg) { + return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); }); +} + +} // namespace folly