X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;ds=sidebyside;f=folly%2Fio%2Fasync%2FEventBase.h;h=3399cea2dacbdcc57af56f3318efd1837b65583e;hb=bda67fde120837b77ddab74f23abcb22ae5b3029;hp=909d965728e981262f5288258b02c271c9a47288;hpb=2a196d5a008f42da5bdf009d204f81593c2adaa4;p=folly.git diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index 909d9657..3399cea2 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,11 @@ #pragma once #include +#include +#include #include -#include #include #include -#include #include #include #include @@ -33,22 +33,24 @@ #include #include +#include #include #include #include +#include +#include #include -#include #include +#include #include #include -#include - -#include // libevent +#include +#include namespace folly { -typedef std::function Cob; +using Cob = Func; // defined in folly/Executor.h template class NotificationQueue; @@ -60,7 +62,7 @@ class EventBaseLocalBaseBase { virtual void onEventBaseDestruction(EventBase& evb) = 0; virtual ~EventBaseLocalBaseBase() = default; }; -} +} // namespace detail template class EventBaseLocal; @@ -93,12 +95,18 @@ class RequestEventBase : public RequestData { std::unique_ptr(new RequestEventBase(eb))); } + bool hasCallback() override { + return false; + } + private: explicit RequestEventBase(EventBase* eb) : eb_(eb) {} EventBase* eb_; static constexpr const char* kContextDataName{"EventBase"}; }; +class VirtualEventBase; + /** * This class is a wrapper for all asynchronous I/O processing functionality * @@ -134,36 +142,63 @@ class EventBase : private boost::noncopyable, * If a LoopCallback object is destroyed while it is scheduled to be run in * the next loop iteration, it will automatically be cancelled. */ - class LoopCallback { + class LoopCallback + : public boost::intrusive::list_base_hook< + boost::intrusive::link_mode> { public: virtual ~LoopCallback() = default; virtual void runLoopCallback() noexcept = 0; void cancelLoopCallback() { - hook_.unlink(); + context_.reset(); + unlink(); } bool isLoopCallbackScheduled() const { - return hook_.is_linked(); + return is_linked(); } private: - typedef boost::intrusive::list_member_hook< - boost::intrusive::link_mode > ListHook; - - ListHook hook_; - typedef boost::intrusive::list< LoopCallback, - boost::intrusive::member_hook, boost::intrusive::constant_time_size > List; // EventBase needs access to LoopCallbackList (and therefore to hook_) friend class EventBase; + friend class VirtualEventBase; std::shared_ptr context_; }; + class FunctionLoopCallback : public LoopCallback { + public: + explicit FunctionLoopCallback(Func&& function) + : function_(std::move(function)) {} + + void runLoopCallback() noexcept override { + function_(); + delete this; + } + + private: + Func function_; + }; + + // Like FunctionLoopCallback, but saves one allocation. Use with caution. + // + // The caller is responsible for maintaining the lifetime of this callback + // until after the point at which the contained function is called. + class StackFunctionLoopCallback : public LoopCallback { + public: + explicit StackFunctionLoopCallback(Func&& function) + : function_(std::move(function)) {} + void runLoopCallback() noexcept override { + Func(std::move(function_))(); + } + + private: + Func function_; + }; + /** * Create a new EventBase object. * @@ -189,7 +224,7 @@ class EventBase : private boost::noncopyable, * observer, max latency and avg loop time. */ explicit EventBase(event_base* evb, bool enableTimeMeasurement = true); - ~EventBase(); + ~EventBase() override; /** * Runs the event loop. @@ -294,13 +329,12 @@ class EventBase : private boost::noncopyable, void runInLoop(LoopCallback* callback, bool thisIteration = false); /** - * Convenience function to call runInLoop() with a std::function. + * Convenience function to call runInLoop() with a folly::Function. * - * This creates a LoopCallback object to wrap the std::function, and invoke - * the std::function when the loop callback fires. This is slightly more + * This creates a LoopCallback object to wrap the folly::Function, and invoke + * the folly::Function when the loop callback fires. This is slightly more * expensive than defining your own LoopCallback, but more convenient in - * areas that aren't performance sensitive where you just want to use - * std::bind. (std::bind is fairly slow on even by itself.) + * areas that aren't too performance sensitive. * * This method may only be called from the EventBase's thread. This * essentially allows an event handler to schedule an additional callback to @@ -322,15 +356,6 @@ class EventBase : private boost::noncopyable, */ void runOnDestruction(LoopCallback* callback); - /** - * Adds the given callback to a queue of things run after the notification - * queue is drained before the destruction of current EventBase. - * - * Note: will be called from the thread that invoked EventBase destructor, - * after the final run of loop callbacks. - */ - void runAfterDrain(Func cob); - /** * Adds a callback that will run immediately *before* the event loop. * This is very similar to runInLoop(), but will not cause the loop to break: @@ -370,10 +395,11 @@ class EventBase : private boost::noncopyable, /** * Run the specified function in the EventBase's thread * - * This version of runInEventBaseThread() takes a std::function object. - * Note that this is less efficient than the version that takes a plain - * function pointer and void* argument, as it has to allocate memory to copy - * the std::function object. + * This version of runInEventBaseThread() takes a folly::Function object. + * Note that this may be less efficient than the version that takes a plain + * function pointer and void* argument, if moving the function is expensive + * (e.g., if it wraps a lambda which captures some values with expensive move + * constructors). * * If the loop is terminated (and never later restarted) before it has a * chance to run the requested function, the function will be run upon the @@ -409,45 +435,22 @@ class EventBase : private boost::noncopyable, */ bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn); - /** - * Runs the given Cob at some time after the specified number of - * milliseconds. (No guarantees exactly when.) - * - * Throws a std::system_error if an error occurs. - */ - void runAfterDelay( - Func c, - uint32_t milliseconds, - TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL); - - /** - * @see tryRunAfterDelay for more details - * - * @return true iff the cob was successfully registered. - * - * */ - bool tryRunAfterDelay( - Func cob, - uint32_t milliseconds, - TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL); - /** * Set the maximum desired latency in us and provide a callback which will be * called when that latency is exceeded. * OBS: This functionality depends on time-measurement. */ - void setMaxLatency(int64_t maxLatency, Func maxLatencyCob) { + void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) { assert(enableTimeMeasurement_); maxLatency_ = maxLatency; maxLatencyCob_ = std::move(maxLatencyCob); } - /** * Set smoothing coefficient for loop load average; # of milliseconds * for exp(-1) (1/2.71828...) decay. */ - void setLoadAvgMsec(uint32_t ms); + void setLoadAvgMsec(std::chrono::milliseconds ms); /** * reset the load average to a desired value @@ -466,7 +469,7 @@ class EventBase : private boost::noncopyable, * check if the event base loop is running. */ bool isRunning() const { - return loopThread_.load(std::memory_order_relaxed) != 0; + return loopThread_.load(std::memory_order_relaxed) != std::thread::id(); } /** @@ -474,7 +477,7 @@ class EventBase : private boost::noncopyable, */ void waitUntilRunning(); - int getNotificationQueueSize() const; + size_t getNotificationQueueSize() const; void setMaxReadAtOnce(uint32_t maxAtOnce); @@ -484,12 +487,31 @@ class EventBase : private boost::noncopyable, */ bool isInEventBaseThread() const { auto tid = loopThread_.load(std::memory_order_relaxed); - return tid == 0 || pthread_equal(tid, pthread_self()); + return tid == std::thread::id() || tid == std::this_thread::get_id(); } bool inRunningEventBaseThread() const { - return pthread_equal( - loopThread_.load(std::memory_order_relaxed), pthread_self()); + return loopThread_.load(std::memory_order_relaxed) == + std::this_thread::get_id(); + } + + /** + * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for + * dcheckIsInEventBaseThread), but it prints more information on + * failure. + */ + void checkIsInEventBaseThread() const; + void dcheckIsInEventBaseThread() const { + if (kIsDebug) { + checkIsInEventBaseThread(); + } + } + + HHWheelTimer& timer() { + if (!wheelTimer_) { + wheelTimer_ = HHWheelTimer::newTimer(this); + } + return *wheelTimer_.get(); } // --------- interface to underlying libevent base ------------ @@ -508,21 +530,23 @@ class EventBase : private boost::noncopyable, * first handler fired within that cycle. * */ - bool bumpHandlingTime() override; + void bumpHandlingTime() final; class SmoothLoopTime { public: - explicit SmoothLoopTime(uint64_t timeInterval) - : expCoeff_(-1.0/timeInterval) - , value_(0.0) - , oldBusyLeftover_(0) { + explicit SmoothLoopTime(std::chrono::microseconds timeInterval) + : expCoeff_(-1.0 / timeInterval.count()), + value_(0.0), + oldBusyLeftover_(0) { VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; } - void setTimeInterval(uint64_t timeInterval); + void setTimeInterval(std::chrono::microseconds timeInterval); void reset(double value = 0.0); - void addSample(int64_t idle, int64_t busy); + void addSample( + std::chrono::microseconds idle, + std::chrono::microseconds busy); double get() const { return value_; @@ -533,9 +557,9 @@ class EventBase : private boost::noncopyable, } private: - double expCoeff_; - double value_; - int64_t oldBusyLeftover_; + double expCoeff_; + double value_; + std::chrono::microseconds oldBusyLeftover_; }; void setObserver(const std::shared_ptr& observer) { @@ -583,58 +607,72 @@ class EventBase : private boost::noncopyable, /// Implements the DrivableExecutor interface void drive() override { + ++loopKeepAliveCount_; + SCOPE_EXIT { + --loopKeepAliveCount_; + }; loopOnce(); } - private: + /// Returns you a handle which make loop() behave like loopForever() until + /// destroyed. loop() will return to its original behavior only when all + /// loop keep-alives are released. + KeepAlive getKeepAliveToken() override { + keepAliveAcquire(); + return makeKeepAlive(); + } + // TimeoutManager - void attachTimeoutManager(AsyncTimeout* obj, - TimeoutManager::InternalEnum internal) override; + void attachTimeoutManager( + AsyncTimeout* obj, + TimeoutManager::InternalEnum internal) final; - void detachTimeoutManager(AsyncTimeout* obj) override; + void detachTimeoutManager(AsyncTimeout* obj) final; bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout) - override; + final; - void cancelTimeout(AsyncTimeout* obj) override; + void cancelTimeout(AsyncTimeout* obj) final; - bool isInTimeoutManagerThread() override { + bool isInTimeoutManagerThread() final { return isInEventBaseThread(); } - /* - * Helper function that tells us whether we have already handled - * some event/timeout/callback in this loop iteration. - */ - bool nothingHandledYet(); - - // --------- libevent callbacks (not for client use) ------------ - - static void runFunctionPtr(Cob* fn); - - // small object used as a callback arg with enough info to execute the - // appropriate client-provided Cob - class CobTimeout : public AsyncTimeout { - public: - CobTimeout(EventBase* b, Func c, TimeoutManager::InternalEnum in) - : AsyncTimeout(b, in), cob_(std::move(c)) {} - - virtual void timeoutExpired() noexcept; + // Returns a VirtualEventBase attached to this EventBase. Can be used to + // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be + // destroyed together with the EventBase. + // + // Any number of VirtualEventBases instances may be independently constructed, + // which are backed by this EventBase. This method should be only used if you + // don't need to manage the life time of the VirtualEventBase used. + folly::VirtualEventBase& getVirtualEventBase(); + + protected: + void keepAliveAcquire() override { + if (inRunningEventBaseThread()) { + loopKeepAliveCount_++; + } else { + loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed); + } + } - private: - Func cob_; + void keepAliveRelease() override { + if (!inRunningEventBaseThread()) { + return add([=] { keepAliveRelease(); }); + } + loopKeepAliveCount_--; + } - public: - typedef boost::intrusive::list_member_hook< - boost::intrusive::link_mode > ListHook; + private: + void applyLoopKeepAlive(); - ListHook hook; + ssize_t loopKeepAliveCount(); - typedef boost::intrusive::list< - CobTimeout, - boost::intrusive::member_hook, - boost::intrusive::constant_time_size > List; - }; + /* + * Helper function that tells us whether we have already handled + * some event/timeout/callback in this loop iteration. + */ + bool nothingHandledYet() const noexcept; typedef LoopCallback::List LoopCallbackList; class FunctionRunner; @@ -642,16 +680,16 @@ class EventBase : private boost::noncopyable, bool loopBody(int flags = 0); // executes any callbacks queued by runInLoop(); returns false if none found - bool runLoopCallbacks(bool setContext = true); + bool runLoopCallbacks(); void initNotificationQueue(); - CobTimeout::List pendingCobTimeouts_; + // should only be accessed through public getter + HHWheelTimer::UniquePtr wheelTimer_; LoopCallbackList loopCallbacks_; LoopCallbackList runBeforeLoopCallbacks_; LoopCallbackList onDestructionCallbacks_; - LoopCallbackList runAfterDrainCallbacks_; // This will be null most of the time, but point to currentCallbacks // if we are in the middle of running loop callbacks, such that @@ -664,11 +702,8 @@ class EventBase : private boost::noncopyable, std::atomic stop_; // The ID of the thread running the main loop. - // 0 if loop is not running. - // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or - // even that atomic is valid), but that's how it is - // everywhere (at least on Linux, FreeBSD, and OSX). - std::atomic loopThread_; + // std::thread::id{} if loop is not running. + std::atomic loopThread_; // pointer to underlying event_base class doing the heavy lifting event_base* evb_; @@ -677,9 +712,12 @@ class EventBase : private boost::noncopyable, // to send function requests to the EventBase thread. std::unique_ptr> queue_; std::unique_ptr fnRunner_; + ssize_t loopKeepAliveCount_{0}; + std::atomic loopKeepAliveCountAtomic_{0}; + bool loopKeepAliveActive_{false}; // limit for latency in microseconds (0 disables) - int64_t maxLatency_; + std::chrono::microseconds maxLatency_; // exponentially-smoothed average loop time for latency-limiting SmoothLoopTime avgLoopTime_; @@ -697,14 +735,15 @@ class EventBase : private boost::noncopyable, // be supported: avg loop time, observer and max latency. const bool enableTimeMeasurement_; - // we'll wait this long before running deferred callbacks if the event - // loop is idle. - static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms - // Wrap-around loop counter to detect beginning of each loop uint64_t nextLoopCnt_; uint64_t latestLoopCnt_; - uint64_t startWork_; + std::chrono::steady_clock::time_point startWork_; + // Prevent undefined behavior from invoking event_base_loop() reentrantly. + // This is needed since many projects use libevent-1.4, which lacks commit + // b557b175c00dc462c1fce25f6e7dd67121d2c001 from + // https://github.com/libevent/libevent/. + bool invokingLoop_{false}; // Observer to export counters std::shared_ptr observer_; @@ -719,91 +758,31 @@ class EventBase : private boost::noncopyable, // allow runOnDestruction() to be called from any threads std::mutex onDestructionCallbacksMutex_; - // allow runAfterDrain() to be called from any threads - std::mutex runAfterDrainCallbacksMutex_; - // see EventBaseLocal friend class detail::EventBaseLocalBase; template friend class EventBaseLocal; - std::mutex localStorageMutex_; std::unordered_map> localStorage_; std::unordered_set localStorageToDtor_; -}; - -namespace detail { -/** - * Define a small functor (2 pointers) and specialize - * std::__is_location_invariant so that std::function does not require - * memory allocation. - * - * std::function func = SmallFunctor{f, p}; - * - * TODO(lucian): remove this hack once GCC <= 4.9 are deprecated. - * In GCC >= 5.0 just use a lambda like: - * - * std::function func = [=] { f(p); }; - * - * See https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61909 - */ -template -struct SmallFunctor { - void (*fn)(T*); - T* p; - void operator()() { fn(p); } + folly::once_flag virtualEventBaseInitFlag_; + std::unique_ptr virtualEventBase_; }; -} // detail - template bool EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) { - return runInEventBaseThread(detail::SmallFunctor{fn, arg}); + return runInEventBaseThread([=] { fn(arg); }); } template bool EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) { - return runInEventBaseThreadAndWait(detail::SmallFunctor{fn, arg}); + return runInEventBaseThreadAndWait([=] { fn(arg); }); } template -bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), - T* arg) { - return runImmediatelyOrRunInEventBaseThreadAndWait( - detail::SmallFunctor{fn, arg}); +bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait( + void (*fn)(T*), + T* arg) { + return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); }); } -} // folly - -FOLLY_NAMESPACE_STD_BEGIN - -/** - * GCC's libstdc++ uses __is_location_invariant to decide wether to - * use small object optimization and embed the functor's contents in - * the std::function object. - * - * (gcc 4.9) $ libstdc++-v3/include/std/functional - * template - * struct __is_location_invariant - * : integral_constant::value - * || is_member_pointer<_Tp>::value)> - * { }; - * - * (gcc 5.0) $ libstdc++-v3/include/std/functional - * - * template - * struct __is_location_invariant - * : is_trivially_copyable<_Tp>::type - * { }; - * - * - * NOTE: Forward declare so this doesn't break when using other - * standard libraries: it just wont have any effect. - */ -template -struct __is_location_invariant; - -template -struct __is_location_invariant> - : public std::true_type {}; - -FOLLY_NAMESPACE_STD_END +} // namespace folly