X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2FEventBase.h;h=3399cea2dacbdcc57af56f3318efd1837b65583e;hb=bda67fde120837b77ddab74f23abcb22ae5b3029;hp=16d406eaa898c205bbcb5f05a2487fdd1d21b713;hpb=ad0f7b2703ffc886a13ec5b420a5282cda380ae1;p=folly.git diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index 16d406ea..3399cea2 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,11 @@ #pragma once #include +#include +#include #include -#include #include #include -#include #include #include #include @@ -33,20 +33,20 @@ #include #include +#include #include #include #include +#include +#include #include -#include #include #include #include #include -#include -#include - -#include // libevent +#include +#include namespace folly { @@ -62,7 +62,7 @@ class EventBaseLocalBaseBase { virtual void onEventBaseDestruction(EventBase& evb) = 0; virtual ~EventBaseLocalBaseBase() = default; }; -} +} // namespace detail template class EventBaseLocal; @@ -95,12 +95,18 @@ class RequestEventBase : public RequestData { std::unique_ptr(new RequestEventBase(eb))); } + bool hasCallback() override { + return false; + } + private: explicit RequestEventBase(EventBase* eb) : eb_(eb) {} EventBase* eb_; static constexpr const char* kContextDataName{"EventBase"}; }; +class VirtualEventBase; + /** * This class is a wrapper for all asynchronous I/O processing functionality * @@ -144,6 +150,7 @@ class EventBase : private boost::noncopyable, virtual void runLoopCallback() noexcept = 0; void cancelLoopCallback() { + context_.reset(); unlink(); } @@ -158,9 +165,40 @@ class EventBase : private boost::noncopyable, // EventBase needs access to LoopCallbackList (and therefore to hook_) friend class EventBase; + friend class VirtualEventBase; std::shared_ptr context_; }; + class FunctionLoopCallback : public LoopCallback { + public: + explicit FunctionLoopCallback(Func&& function) + : function_(std::move(function)) {} + + void runLoopCallback() noexcept override { + function_(); + delete this; + } + + private: + Func function_; + }; + + // Like FunctionLoopCallback, but saves one allocation. Use with caution. + // + // The caller is responsible for maintaining the lifetime of this callback + // until after the point at which the contained function is called. + class StackFunctionLoopCallback : public LoopCallback { + public: + explicit StackFunctionLoopCallback(Func&& function) + : function_(std::move(function)) {} + void runLoopCallback() noexcept override { + Func(std::move(function_))(); + } + + private: + Func function_; + }; + /** * Create a new EventBase object. * @@ -186,7 +224,7 @@ class EventBase : private boost::noncopyable, * observer, max latency and avg loop time. */ explicit EventBase(event_base* evb, bool enableTimeMeasurement = true); - ~EventBase(); + ~EventBase() override; /** * Runs the event loop. @@ -397,45 +435,22 @@ class EventBase : private boost::noncopyable, */ bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn); - /** - * Runs the given Cob at some time after the specified number of - * milliseconds. (No guarantees exactly when.) - * - * Throws a std::system_error if an error occurs. - */ - void runAfterDelay( - Func c, - uint32_t milliseconds, - TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL); - - /** - * @see tryRunAfterDelay for more details - * - * @return true iff the cob was successfully registered. - * - * */ - bool tryRunAfterDelay( - Func cob, - uint32_t milliseconds, - TimeoutManager::InternalEnum in = TimeoutManager::InternalEnum::NORMAL); - /** * Set the maximum desired latency in us and provide a callback which will be * called when that latency is exceeded. * OBS: This functionality depends on time-measurement. */ - void setMaxLatency(int64_t maxLatency, Func maxLatencyCob) { + void setMaxLatency(std::chrono::microseconds maxLatency, Func maxLatencyCob) { assert(enableTimeMeasurement_); maxLatency_ = maxLatency; maxLatencyCob_ = std::move(maxLatencyCob); } - /** * Set smoothing coefficient for loop load average; # of milliseconds * for exp(-1) (1/2.71828...) decay. */ - void setLoadAvgMsec(uint32_t ms); + void setLoadAvgMsec(std::chrono::milliseconds ms); /** * reset the load average to a desired value @@ -454,7 +469,7 @@ class EventBase : private boost::noncopyable, * check if the event base loop is running. */ bool isRunning() const { - return loopThread_.load(std::memory_order_relaxed) != 0; + return loopThread_.load(std::memory_order_relaxed) != std::thread::id(); } /** @@ -462,7 +477,7 @@ class EventBase : private boost::noncopyable, */ void waitUntilRunning(); - int getNotificationQueueSize() const; + size_t getNotificationQueueSize() const; void setMaxReadAtOnce(uint32_t maxAtOnce); @@ -472,12 +487,24 @@ class EventBase : private boost::noncopyable, */ bool isInEventBaseThread() const { auto tid = loopThread_.load(std::memory_order_relaxed); - return tid == 0 || pthread_equal(tid, pthread_self()); + return tid == std::thread::id() || tid == std::this_thread::get_id(); } bool inRunningEventBaseThread() const { - return pthread_equal( - loopThread_.load(std::memory_order_relaxed), pthread_self()); + return loopThread_.load(std::memory_order_relaxed) == + std::this_thread::get_id(); + } + + /** + * Equivalent to CHECK(isInEventBaseThread()) (and assert/DCHECK for + * dcheckIsInEventBaseThread), but it prints more information on + * failure. + */ + void checkIsInEventBaseThread() const; + void dcheckIsInEventBaseThread() const { + if (kIsDebug) { + checkIsInEventBaseThread(); + } } HHWheelTimer& timer() { @@ -503,21 +530,23 @@ class EventBase : private boost::noncopyable, * first handler fired within that cycle. * */ - void bumpHandlingTime() override final; + void bumpHandlingTime() final; class SmoothLoopTime { public: - explicit SmoothLoopTime(uint64_t timeInterval) - : expCoeff_(-1.0/timeInterval) - , value_(0.0) - , oldBusyLeftover_(0) { + explicit SmoothLoopTime(std::chrono::microseconds timeInterval) + : expCoeff_(-1.0 / timeInterval.count()), + value_(0.0), + oldBusyLeftover_(0) { VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; } - void setTimeInterval(uint64_t timeInterval); + void setTimeInterval(std::chrono::microseconds timeInterval); void reset(double value = 0.0); - void addSample(int64_t idle, int64_t busy); + void addSample( + std::chrono::microseconds idle, + std::chrono::microseconds busy); double get() const { return value_; @@ -528,9 +557,9 @@ class EventBase : private boost::noncopyable, } private: - double expCoeff_; - double value_; - int64_t oldBusyLeftover_; + double expCoeff_; + double value_; + std::chrono::microseconds oldBusyLeftover_; }; void setObserver(const std::shared_ptr& observer) { @@ -578,78 +607,73 @@ class EventBase : private boost::noncopyable, /// Implements the DrivableExecutor interface void drive() override { - auto keepAlive = loopKeepAlive(); + ++loopKeepAliveCount_; + SCOPE_EXIT { + --loopKeepAliveCount_; + }; loopOnce(); } - struct LoopKeepAliveDeleter { - void operator()(EventBase* evb) { - DCHECK(evb->isInEventBaseThread()); - evb->loopKeepAliveCount_--; - } - }; - using LoopKeepAlive = std::unique_ptr; - /// Returns you a handle which make loop() behave like loopForever() until /// destroyed. loop() will return to its original behavior only when all - /// loop keep-alives are released. Loop holder is safe to release only from - /// EventBase thread. - /// - /// May return no op LoopKeepAlive if loopForever() is already running. - LoopKeepAlive loopKeepAlive() { - DCHECK(isInEventBaseThread()); - loopKeepAliveCount_++; - return LoopKeepAlive(this); + /// loop keep-alives are released. + KeepAlive getKeepAliveToken() override { + keepAliveAcquire(); + return makeKeepAlive(); } - private: // TimeoutManager - void attachTimeoutManager(AsyncTimeout* obj, - TimeoutManager::InternalEnum internal) override; + void attachTimeoutManager( + AsyncTimeout* obj, + TimeoutManager::InternalEnum internal) final; - void detachTimeoutManager(AsyncTimeout* obj) override; + void detachTimeoutManager(AsyncTimeout* obj) final; bool scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout) - override; + final; - void cancelTimeout(AsyncTimeout* obj) override; + void cancelTimeout(AsyncTimeout* obj) final; - bool isInTimeoutManagerThread() override final { + bool isInTimeoutManagerThread() final { return isInEventBaseThread(); } + // Returns a VirtualEventBase attached to this EventBase. Can be used to + // pass to APIs which expect VirtualEventBase. This VirtualEventBase will be + // destroyed together with the EventBase. + // + // Any number of VirtualEventBases instances may be independently constructed, + // which are backed by this EventBase. This method should be only used if you + // don't need to manage the life time of the VirtualEventBase used. + folly::VirtualEventBase& getVirtualEventBase(); + + protected: + void keepAliveAcquire() override { + if (inRunningEventBaseThread()) { + loopKeepAliveCount_++; + } else { + loopKeepAliveCountAtomic_.fetch_add(1, std::memory_order_relaxed); + } + } + + void keepAliveRelease() override { + if (!inRunningEventBaseThread()) { + return add([=] { keepAliveRelease(); }); + } + loopKeepAliveCount_--; + } + + private: void applyLoopKeepAlive(); + ssize_t loopKeepAliveCount(); + /* * Helper function that tells us whether we have already handled * some event/timeout/callback in this loop iteration. */ bool nothingHandledYet() const noexcept; - // small object used as a callback arg with enough info to execute the - // appropriate client-provided Cob - class CobTimeout : public AsyncTimeout { - public: - CobTimeout(EventBase* b, Func c, TimeoutManager::InternalEnum in) - : AsyncTimeout(b, in), cob_(std::move(c)) {} - - virtual void timeoutExpired() noexcept; - - private: - Func cob_; - - public: - typedef boost::intrusive::list_member_hook< - boost::intrusive::link_mode > ListHook; - - ListHook hook; - - typedef boost::intrusive::list< - CobTimeout, - boost::intrusive::member_hook, - boost::intrusive::constant_time_size > List; - }; - typedef LoopCallback::List LoopCallbackList; class FunctionRunner; @@ -663,8 +687,6 @@ class EventBase : private boost::noncopyable, // should only be accessed through public getter HHWheelTimer::UniquePtr wheelTimer_; - CobTimeout::List pendingCobTimeouts_; - LoopCallbackList loopCallbacks_; LoopCallbackList runBeforeLoopCallbacks_; LoopCallbackList onDestructionCallbacks_; @@ -680,11 +702,8 @@ class EventBase : private boost::noncopyable, std::atomic stop_; // The ID of the thread running the main loop. - // 0 if loop is not running. - // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or - // even that atomic is valid), but that's how it is - // everywhere (at least on Linux, FreeBSD, and OSX). - std::atomic loopThread_; + // std::thread::id{} if loop is not running. + std::atomic loopThread_; // pointer to underlying event_base class doing the heavy lifting event_base* evb_; @@ -693,11 +712,12 @@ class EventBase : private boost::noncopyable, // to send function requests to the EventBase thread. std::unique_ptr> queue_; std::unique_ptr fnRunner_; - size_t loopKeepAliveCount_{0}; + ssize_t loopKeepAliveCount_{0}; + std::atomic loopKeepAliveCountAtomic_{0}; bool loopKeepAliveActive_{false}; // limit for latency in microseconds (0 disables) - int64_t maxLatency_; + std::chrono::microseconds maxLatency_; // exponentially-smoothed average loop time for latency-limiting SmoothLoopTime avgLoopTime_; @@ -715,14 +735,10 @@ class EventBase : private boost::noncopyable, // be supported: avg loop time, observer and max latency. const bool enableTimeMeasurement_; - // we'll wait this long before running deferred callbacks if the event - // loop is idle. - static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms - // Wrap-around loop counter to detect beginning of each loop uint64_t nextLoopCnt_; uint64_t latestLoopCnt_; - uint64_t startWork_; + std::chrono::steady_clock::time_point startWork_; // Prevent undefined behavior from invoking event_base_loop() reentrantly. // This is needed since many projects use libevent-1.4, which lacks commit // b557b175c00dc462c1fce25f6e7dd67121d2c001 from @@ -745,9 +761,11 @@ class EventBase : private boost::noncopyable, // see EventBaseLocal friend class detail::EventBaseLocalBase; template friend class EventBaseLocal; - std::mutex localStorageMutex_; std::unordered_map> localStorage_; std::unordered_set localStorageToDtor_; + + folly::once_flag virtualEventBaseInitFlag_; + std::unique_ptr virtualEventBase_; }; template @@ -767,4 +785,4 @@ bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait( return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); }); } -} // folly +} // namespace folly