X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2FEventBase.h;h=c318d609ef6c3ada5e04865cf7724ece4a168df7;hp=b450e97fee40762ed82c4ea83cd4797659fc1ee2;hb=53367378d6e5455dfbc649b325cfa583c54227ac;hpb=6e8fcd168c0edf827e6f828bf81f52b661e8d17c diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index b450e97f..c318d609 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,20 +37,20 @@ #include #include #include +#include #include #include #include #include #include #include -#include #include #include // libevent namespace folly { -typedef std::function Cob; +using Cob = Func; // defined in folly/Executor.h template class NotificationQueue; @@ -101,6 +101,8 @@ class RequestEventBase : public RequestData { static constexpr const char* kContextDataName{"EventBase"}; }; +class VirtualEventBase; + /** * This class is a wrapper for all asynchronous I/O processing functionality * @@ -124,6 +126,7 @@ class EventBase : private boost::noncopyable, public DrivableExecutor { public: using Func = folly::Function; + using FuncRef = folly::FunctionRef; /** * A callback interface to use with runInLoop() @@ -136,36 +139,62 @@ class EventBase : private boost::noncopyable, * If a LoopCallback object is destroyed while it is scheduled to be run in * the next loop iteration, it will automatically be cancelled. */ - class LoopCallback { + class LoopCallback + : public boost::intrusive::list_base_hook< + boost::intrusive::link_mode> { public: virtual ~LoopCallback() = default; virtual void runLoopCallback() noexcept = 0; void cancelLoopCallback() { - hook_.unlink(); + unlink(); } bool isLoopCallbackScheduled() const { - return hook_.is_linked(); + return is_linked(); } private: - typedef boost::intrusive::list_member_hook< - boost::intrusive::link_mode > ListHook; - - ListHook hook_; - typedef boost::intrusive::list< LoopCallback, - boost::intrusive::member_hook, boost::intrusive::constant_time_size > List; // EventBase needs access to LoopCallbackList (and therefore to hook_) friend class EventBase; + friend class VirtualEventBase; std::shared_ptr context_; }; + class FunctionLoopCallback : public LoopCallback { + public: + explicit FunctionLoopCallback(Func&& function) + : function_(std::move(function)) {} + + void runLoopCallback() noexcept override { + function_(); + delete this; + } + + private: + Func function_; + }; + + // Like FunctionLoopCallback, but saves one allocation. Use with caution. + // + // The caller is responsible for maintaining the lifetime of this callback + // until after the point at which the contained function is called. + class StackFunctionLoopCallback : public LoopCallback { + public: + explicit StackFunctionLoopCallback(Func&& function) + : function_(std::move(function)) {} + void runLoopCallback() noexcept override { + Func(std::move(function_))(); + } + + private: + Func function_; + }; + /** * Create a new EventBase object. * @@ -323,15 +352,6 @@ class EventBase : private boost::noncopyable, */ void runOnDestruction(LoopCallback* callback); - /** - * Adds the given callback to a queue of things run after the notification - * queue is drained before the destruction of current EventBase. - * - * Note: will be called from the thread that invoked EventBase destructor, - * after the final run of loop callbacks. - */ - void runAfterDrain(Func cob); - /** * Adds a callback that will run immediately *before* the event loop. * This is very similar to runInLoop(), but will not cause the loop to break: @@ -396,7 +416,7 @@ class EventBase : private boost::noncopyable, * Like runInEventBaseThread, but the caller waits for the callback to be * executed. */ - bool runInEventBaseThreadAndWait(Func fn); + bool runInEventBaseThreadAndWait(FuncRef fn); /* * Like runInEventBaseThreadAndWait, except if the caller is already in the @@ -409,47 +429,24 @@ class EventBase : private boost::noncopyable, * Like runInEventBaseThreadAndWait, except if the caller is already in the * event base thread, the functor is simply run inline. */ - bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn); - - /** - * Runs the given Cob at some time after the specified number of - * milliseconds. (No guarantees exactly when.) - * - * Throws a std::system_error if an error occurs. - */ - void runAfterDelay( - Func c, - uint32_t milliseconds, - TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL); - - /** - * @see tryRunAfterDelay for more details - * - * @return true iff the cob was successfully registered. - * - * */ - bool tryRunAfterDelay( - Func cob, - uint32_t milliseconds, - TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL); + bool runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn); /** * Set the maximum desired latency in us and provide a callback which will be * called when that latency is exceeded. * OBS: This functionality depends on time-measurement. */ - void setMaxLatency(int64_t maxLatency, Func maxLatencyCob) { + void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) { assert(enableTimeMeasurement_); maxLatency_ = maxLatency; maxLatencyCob_ = std::move(maxLatencyCob); } - /** * Set smoothing coefficient for loop load average; # of milliseconds * for exp(-1) (1/2.71828...) decay. */ - void setLoadAvgMsec(uint32_t ms); + void setLoadAvgMsec(std::chrono::milliseconds ms); /** * reset the load average to a desired value @@ -468,7 +465,7 @@ class EventBase : private boost::noncopyable, * check if the event base loop is running. */ bool isRunning() const { - return loopThread_.load(std::memory_order_relaxed) != 0; + return loopThread_.load(std::memory_order_relaxed) != std::thread::id(); } /** @@ -476,7 +473,7 @@ class EventBase : private boost::noncopyable, */ void waitUntilRunning(); - int getNotificationQueueSize() const; + size_t getNotificationQueueSize() const; void setMaxReadAtOnce(uint32_t maxAtOnce); @@ -486,12 +483,12 @@ class EventBase : private boost::noncopyable, */ bool isInEventBaseThread() const { auto tid = loopThread_.load(std::memory_order_relaxed); - return tid == 0 || pthread_equal(tid, pthread_self()); + return tid == std::thread::id() || tid == std::this_thread::get_id(); } bool inRunningEventBaseThread() const { - return pthread_equal( - loopThread_.load(std::memory_order_relaxed), pthread_self()); + return loopThread_.load(std::memory_order_relaxed) == + std::this_thread::get_id(); } HHWheelTimer& timer() { @@ -521,17 +518,19 @@ class EventBase : private boost::noncopyable, class SmoothLoopTime { public: - explicit SmoothLoopTime(uint64_t timeInterval) - : expCoeff_(-1.0/timeInterval) - , value_(0.0) - , oldBusyLeftover_(0) { + explicit SmoothLoopTime(std::chrono::microseconds timeInterval) + : expCoeff_(-1.0 / timeInterval.count()), + value_(0.0), + oldBusyLeftover_(0) { VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; } - void setTimeInterval(uint64_t timeInterval); + void setTimeInterval(std::chrono::microseconds timeInterval); void reset(double value = 0.0); - void addSample(int64_t idle, int64_t busy); + void addSample( + std::chrono::microseconds idle, + std::chrono::microseconds busy); double get() const { return value_; @@ -542,9 +541,9 @@ class EventBase : private boost::noncopyable, } private: - double expCoeff_; - double value_; - int64_t oldBusyLeftover_; + double expCoeff_; + double value_; + std::chrono::microseconds oldBusyLeftover_; }; void setObserver(const std::shared_ptr& observer) { @@ -592,78 +591,61 @@ class EventBase : private boost::noncopyable, /// Implements the DrivableExecutor interface void drive() override { - auto keepAlive = loopKeepAlive(); + // We can't use loopKeepAlive() here since LoopKeepAlive token can only be + // released inside a loop. + ++loopKeepAliveCount_; + SCOPE_EXIT { + --loopKeepAliveCount_; + }; loopOnce(); } - struct LoopKeepAliveDeleter { - void operator()(EventBase* evb) { - DCHECK(evb->isInEventBaseThread()); - evb->loopKeepAliveCount_--; - } - }; - using LoopKeepAlive = std::unique_ptr; - /// Returns you a handle which make loop() behave like loopForever() until /// destroyed. loop() will return to its original behavior only when all /// loop keep-alives are released. Loop holder is safe to release only from /// EventBase thread. - /// - /// May return no op LoopKeepAlive if loopForever() is already running. - LoopKeepAlive loopKeepAlive() { - DCHECK(isInEventBaseThread()); - loopKeepAliveCount_++; - return LoopKeepAlive(this); + KeepAlive getKeepAliveToken() override { + if (inRunningEventBaseThread()) { + loopKeepAliveCount_++; + } else { + loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed); + } + return makeKeepAlive(); } - private: // TimeoutManager - void attachTimeoutManager(AsyncTimeout* obj, - TimeoutManager::InternalEnum internal) override; + void attachTimeoutManager( + AsyncTimeout* obj, + TimeoutManager::InternalEnum internal) override final; - void detachTimeoutManager(AsyncTimeout* obj) override; + void detachTimeoutManager(AsyncTimeout* obj) override final; bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout) - override; + override final; - void cancelTimeout(AsyncTimeout* obj) override; + void cancelTimeout(AsyncTimeout* obj) override final; bool isInTimeoutManagerThread() override final { return isInEventBaseThread(); } + protected: + void keepAliveRelease() override { + DCHECK(isInEventBaseThread()); + loopKeepAliveCount_--; + } + + private: void applyLoopKeepAlive(); + ssize_t loopKeepAliveCount(); + /* * Helper function that tells us whether we have already handled * some event/timeout/callback in this loop iteration. */ bool nothingHandledYet() const noexcept; - // small object used as a callback arg with enough info to execute the - // appropriate client-provided Cob - class CobTimeout : public AsyncTimeout { - public: - CobTimeout(EventBase* b, Func c, TimeoutManager::InternalEnum in) - : AsyncTimeout(b, in), cob_(std::move(c)) {} - - virtual void timeoutExpired() noexcept; - - private: - Func cob_; - - public: - typedef boost::intrusive::list_member_hook< - boost::intrusive::link_mode > ListHook; - - ListHook hook; - - typedef boost::intrusive::list< - CobTimeout, - boost::intrusive::member_hook, - boost::intrusive::constant_time_size > List; - }; - typedef LoopCallback::List LoopCallbackList; class FunctionRunner; @@ -677,12 +659,9 @@ class EventBase : private boost::noncopyable, // should only be accessed through public getter HHWheelTimer::UniquePtr wheelTimer_; - CobTimeout::List pendingCobTimeouts_; - LoopCallbackList loopCallbacks_; LoopCallbackList runBeforeLoopCallbacks_; LoopCallbackList onDestructionCallbacks_; - LoopCallbackList runAfterDrainCallbacks_; // This will be null most of the time, but point to currentCallbacks // if we are in the middle of running loop callbacks, such that @@ -695,11 +674,8 @@ class EventBase : private boost::noncopyable, std::atomic stop_; // The ID of the thread running the main loop. - // 0 if loop is not running. - // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or - // even that atomic is valid), but that's how it is - // everywhere (at least on Linux, FreeBSD, and OSX). - std::atomic loopThread_; + // std::thread::id{} if loop is not running. + std::atomic loopThread_; // pointer to underlying event_base class doing the heavy lifting event_base* evb_; @@ -708,11 +684,12 @@ class EventBase : private boost::noncopyable, // to send function requests to the EventBase thread. std::unique_ptr> queue_; std::unique_ptr fnRunner_; - size_t loopKeepAliveCount_{0}; + ssize_t loopKeepAliveCount_{0}; + std::atomic loopKeepAliveCountAtomic_{0}; bool loopKeepAliveActive_{false}; // limit for latency in microseconds (0 disables) - int64_t maxLatency_; + std::chrono::microseconds maxLatency_; // exponentially-smoothed average loop time for latency-limiting SmoothLoopTime avgLoopTime_; @@ -730,14 +707,10 @@ class EventBase : private boost::noncopyable, // be supported: avg loop time, observer and max latency. const bool enableTimeMeasurement_; - // we'll wait this long before running deferred callbacks if the event - // loop is idle. - static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms - // Wrap-around loop counter to detect beginning of each loop uint64_t nextLoopCnt_; uint64_t latestLoopCnt_; - uint64_t startWork_; + std::chrono::steady_clock::time_point startWork_; // Prevent undefined behavior from invoking event_base_loop() reentrantly. // This is needed since many projects use libevent-1.4, which lacks commit // b557b175c00dc462c1fce25f6e7dd67121d2c001 from @@ -757,9 +730,6 @@ class EventBase : private boost::noncopyable, // allow runOnDestruction() to be called from any threads std::mutex onDestructionCallbacksMutex_; - // allow runAfterDrain() to be called from any threads - std::mutex runAfterDrainCallbacksMutex_; - // see EventBaseLocal friend class detail::EventBaseLocalBase; template friend class EventBaseLocal;