X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2FEventBase.cpp;h=a14cdbe15526160f7eb7fd73ba857feb6eb1f0f3;hb=a4306bcdf04c95f321a96ab9b0f29de1c1e678f1;hp=5c0b78129e37a39e9f4da5ecbba45604b9363710;hpb=2d2aed32cff14f35fd3163276a0475fcd682edf5;p=folly.git diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index 5c0b7812..a14cdbe1 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2014-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,40 +13,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS #endif #include -#include -#include - -#include #include -#include -#include -#include - -namespace { -using folly::EventBase; - -class FunctionLoopCallback : public EventBase::LoopCallback { - public: - explicit FunctionLoopCallback(EventBase::Func&& function) - : function_(std::move(function)) {} - - void runLoopCallback() noexcept override { - function_(); - delete this; - } +#include +#include +#include - private: - EventBase::Func function_; -}; -} +#include +#include +#include +#include +#include +#include namespace folly { @@ -57,7 +41,7 @@ namespace folly { class EventBase::FunctionRunner : public NotificationQueue::Consumer { public: - void messageAvailable(Func&& msg) override { + void messageAvailable(Func&& msg) noexcept override { // In libevent2, internal events do not break the loop. // Most users would expect loop(), followed by runInEventBaseThread(), // to break the loop and check if it should exit or not. @@ -71,46 +55,10 @@ class EventBase::FunctionRunner // wake up the loop. We can ignore these messages. return; } - - // The function should never throw an exception, because we have no - // way of knowing what sort of error handling to perform. - // - // If it does throw, log a message and abort the program. - try { - msg(); - } catch (const std::exception& ex) { - LOG(ERROR) << "runInEventBaseThread() function threw a " - << typeid(ex).name() << " exception: " << ex.what(); - abort(); - } catch (...) { - LOG(ERROR) << "runInEventBaseThread() function threw an exception"; - abort(); - } + msg(); } }; -/* - * EventBase::CobTimeout methods - */ - -void EventBase::CobTimeout::timeoutExpired() noexcept { - // For now, we just swallow any exceptions that the callback threw. - try { - cob_(); - } catch (const std::exception& ex) { - LOG(ERROR) << "EventBase::runAfterDelay() callback threw " - << typeid(ex).name() << " exception: " << ex.what(); - } catch (...) { - LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception " - << "type"; - } - - // The CobTimeout object was allocated on the heap by runAfterDelay(), - // so delete it now that the it has fired. - delete this; -} - - // The interface used to libevent is not thread-safe. Calls to // event_init() and event_base_free() directly modify an internal // global 'current_base', so a mutex is required to protect this. @@ -131,12 +79,12 @@ EventBase::EventBase(bool enableTimeMeasurement) , queue_(nullptr) , fnRunner_(nullptr) , maxLatency_(0) - , avgLoopTime_(2000000) + , avgLoopTime_(std::chrono::seconds(2)) , maxLatencyLoopTime_(avgLoopTime_) , enableTimeMeasurement_(enableTimeMeasurement) - , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon + , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon , latestLoopCnt_(nextLoopCnt_) - , startWork_(0) + , startWork_() , observer_(nullptr) , observerSampleCount_(0) , executionObserver_(nullptr) { @@ -147,7 +95,7 @@ EventBase::EventBase(bool enableTimeMeasurement) // The value 'current_base' (libevent 1) or // 'event_global_current_base_' (libevent 2) is filled in by event_set(), // allowing examination of its value without an explicit reference here. - // If ev.ev_base is NULL, then event_init() must be called, otherwise + // If ev.ev_base is nullptr, then event_init() must be called, otherwise // call event_base_new(). event_set(&ev, 0, 0, nullptr, nullptr); if (!ev.ev_base) { @@ -165,7 +113,6 @@ EventBase::EventBase(bool enableTimeMeasurement) } VLOG(5) << "EventBase(): Created."; initNotificationQueue(); - RequestContext::saveContext(); } // takes ownership of the event_base @@ -177,12 +124,12 @@ EventBase::EventBase(event_base* evb, bool enableTimeMeasurement) , queue_(nullptr) , fnRunner_(nullptr) , maxLatency_(0) - , avgLoopTime_(2000000) + , avgLoopTime_(std::chrono::seconds(2)) , maxLatencyLoopTime_(avgLoopTime_) , enableTimeMeasurement_(enableTimeMeasurement) - , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon + , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon , latestLoopCnt_(nextLoopCnt_) - , startWork_(0) + , startWork_() , observer_(nullptr) , observerSampleCount_(0) , executionObserver_(nullptr) { @@ -191,10 +138,26 @@ EventBase::EventBase(event_base* evb, bool enableTimeMeasurement) throw std::invalid_argument("EventBase(): event base cannot be nullptr"); } initNotificationQueue(); - RequestContext::saveContext(); } EventBase::~EventBase() { + std::future virtualEventBaseDestroyFuture; + if (virtualEventBase_) { + virtualEventBaseDestroyFuture = virtualEventBase_->destroy(); + } + + // Keep looping until all keep-alive handles are released. Each keep-alive + // handle signals that some external code will still schedule some work on + // this EventBase (so it's not safe to destroy it). + while (loopKeepAliveCount() > 0) { + applyLoopKeepAlive(); + loopOnce(); + } + + if (virtualEventBaseDestroyFuture.valid()) { + virtualEventBaseDestroyFuture.get(); + } + // Call all destruction callbacks, before we start cleaning up our state. while (!onDestructionCallbacks_.empty()) { LoopCallback* callback = &onDestructionCallbacks_.front(); @@ -202,20 +165,11 @@ EventBase::~EventBase() { callback->runLoopCallback(); } - // Delete any unfired callback objects, so that we don't leak memory - // (Note that we don't fire them. The caller is responsible for cleaning up - // its own data structures if it destroys the EventBase with unfired events - // remaining.) - while (!pendingCobTimeouts_.empty()) { - CobTimeout* timeout = &pendingCobTimeouts_.front(); - delete timeout; - } + clearCobTimeouts(); - while (!runBeforeLoopCallbacks_.empty()) { - delete &runBeforeLoopCallbacks_.front(); - } + DCHECK_EQ(0u, runBeforeLoopCallbacks_.size()); - (void) runLoopCallbacks(false); + (void)runLoopCallbacks(); if (!fnRunner_->consumeUntilDrained()) { LOG(ERROR) << "~EventBase(): Unable to drain notification queue"; @@ -228,22 +182,14 @@ EventBase::~EventBase() { event_base_free(evb_); } - while (!runAfterDrainCallbacks_.empty()) { - LoopCallback* callback = &runAfterDrainCallbacks_.front(); - runAfterDrainCallbacks_.pop_front(); - callback->runLoopCallback(); + for (auto storage : localStorageToDtor_) { + storage->onEventBaseDestruction(*this); } - { - std::lock_guard lock(localStorageMutex_); - for (auto storage : localStorageToDtor_) { - storage->onEventBaseDestruction(*this); - } - } VLOG(5) << "EventBase(): Destroyed."; } -int EventBase::getNotificationQueueSize() const { +size_t EventBase::getNotificationQueueSize() const { return queue_->size(); } @@ -251,12 +197,29 @@ void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) { fnRunner_->setMaxReadAtOnce(maxAtOnce); } +void EventBase::checkIsInEventBaseThread() const { + auto evbTid = loopThread_.load(std::memory_order_relaxed); + if (evbTid == std::thread::id()) { + return; + } + + // Using getThreadName(evbTid) instead of name_ will work also if + // the thread name is set outside of EventBase (and name_ is empty). + auto curTid = std::this_thread::get_id(); + CHECK(evbTid == curTid) + << "This logic must be executed in the event base thread. " + << "Event base thread name: \"" + << folly::getThreadName(evbTid).value_or("") + << "\", current thread name: \"" + << folly::getThreadName(curTid).value_or("") << "\""; +} + // Set smoothing coefficient for loop load average; input is # of milliseconds // for exp(-1) decay. -void EventBase::setLoadAvgMsec(uint32_t ms) { +void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) { assert(enableTimeMeasurement_); - uint64_t us = 1000 * ms; - if (ms > 0) { + std::chrono::microseconds us = std::chrono::milliseconds(ms); + if (ms > std::chrono::milliseconds::zero()) { maxLatencyLoopTime_.setTimeInterval(us); avgLoopTime_.setTimeInterval(us); } else { @@ -280,7 +243,7 @@ getTimeDelta(std::chrono::steady_clock::time_point* prev) { void EventBase::waitUntilRunning() { while (!isRunning()) { - sched_yield(); + std::this_thread::yield(); } } @@ -295,6 +258,19 @@ bool EventBase::loopOnce(int flags) { bool EventBase::loopBody(int flags) { VLOG(5) << "EventBase(): Starting loop."; + + DCHECK(!invokingLoop_) + << "Your code just tried to loop over an event base from inside another " + << "event base loop. Since libevent is not reentrant, this leads to " + << "undefined behavior in opt builds. Please fix immediately. For the " + << "common case of an inner function that needs to do some synchronous " + << "computation on an event-base, replace getEventBase() by a new, " + << "stack-allocated EvenBase."; + invokingLoop_ = true; + SCOPE_EXIT { + invokingLoop_ = false; + }; + int res = 0; bool ranLoopCallbacks; bool blocking = !(flags & EVLOOP_NONBLOCK); @@ -302,11 +278,11 @@ bool EventBase::loopBody(int flags) { // time-measurement variables. std::chrono::steady_clock::time_point prev; - int64_t idleStart = 0; - int64_t busy; - int64_t idle; + std::chrono::steady_clock::time_point idleStart = {}; + std::chrono::microseconds busy; + std::chrono::microseconds idle; - loopThread_.store(pthread_self(), std::memory_order_release); + loopThread_.store(std::this_thread::get_id(), std::memory_order_release); if (!name_.empty()) { setThreadName(name_); @@ -314,11 +290,10 @@ bool EventBase::loopBody(int flags) { if (enableTimeMeasurement_) { prev = std::chrono::steady_clock::now(); - idleStart = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); + idleStart = prev; } - while (!stop_.load(std::memory_order_acquire)) { + while (!stop_.load(std::memory_order_relaxed)) { applyLoopKeepAlive(); ++nextLoopCnt_; @@ -343,34 +318,36 @@ bool EventBase::loopBody(int flags) { ranLoopCallbacks = runLoopCallbacks(); if (enableTimeMeasurement_) { + auto now = std::chrono::steady_clock::now(); busy = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count() - - startWork_; - idle = startWork_ - idleStart; + now - startWork_); + idle = std::chrono::duration_cast( + startWork_ - idleStart); + auto loop_time = busy + idle; - avgLoopTime_.addSample(idle, busy); - maxLatencyLoopTime_.addSample(idle, busy); + avgLoopTime_.addSample(loop_time, busy); + maxLatencyLoopTime_.addSample(loop_time, busy); if (observer_) { if (observerSampleCount_++ == observer_->getSampleRate()) { observerSampleCount_ = 0; - observer_->loopSample(busy, idle); + observer_->loopSample(busy.count(), idle.count()); } } - VLOG(11) << "EventBase " << this << " did not timeout " - " loop time guess: " << busy + idle << - " idle time: " << idle << - " busy time: " << busy << - " avgLoopTime: " << avgLoopTime_.get() << - " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() << - " maxLatency_: " << maxLatency_ << - " notificationQueueSize: " << getNotificationQueueSize() << - " nothingHandledYet(): "<< nothingHandledYet(); + VLOG(11) << "EventBase " << this << " did not timeout " + << " loop time guess: " << loop_time.count() + << " idle time: " << idle.count() + << " busy time: " << busy.count() + << " avgLoopTime: " << avgLoopTime_.get() + << " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() + << " maxLatency_: " << maxLatency_.count() << "us" + << " notificationQueueSize: " << getNotificationQueueSize() + << " nothingHandledYet(): " << nothingHandledYet(); // see if our average loop time has exceeded our limit - if ((maxLatency_ > 0) && - (maxLatencyLoopTime_.get() > double(maxLatency_))) { + if ((maxLatency_ > std::chrono::microseconds::zero()) && + (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) { maxLatencyCob_(); // back off temporarily -- don't keep spamming maxLatencyCob_ // if we're only a bit over the limit @@ -378,8 +355,7 @@ bool EventBase::loopBody(int flags) { } // Our loop run did real work; reset the idle timer - idleStart = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); + idleStart = now; } else { VLOG(11) << "EventBase " << this << " did not timeout"; } @@ -399,7 +375,7 @@ bool EventBase::loopBody(int flags) { } if (enableTimeMeasurement_) { - VLOG(5) << "EventBase " << this << " loop time: " << + VLOG(11) << "EventBase " << this << " loop time: " << getTimeDelta(&prev).count(); } @@ -408,7 +384,7 @@ bool EventBase::loopBody(int flags) { } } // Reset stop_ so loop() can be called again - stop_ = false; + stop_.store(false, std::memory_order_relaxed); if (res < 0) { LOG(ERROR) << "EventBase: -- error in event loop, res = " << res; @@ -426,13 +402,29 @@ bool EventBase::loopBody(int flags) { return true; } +ssize_t EventBase::loopKeepAliveCount() { + if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) { + loopKeepAliveCount_ += + loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed); + } + DCHECK_GE(loopKeepAliveCount_, 0); + + return loopKeepAliveCount_; +} + void EventBase::applyLoopKeepAlive() { - if (loopKeepAliveActive_ && loopKeepAlive_.unique()) { + auto keepAliveCount = loopKeepAliveCount(); + // Make sure default VirtualEventBase won't hold EventBase::loop() forever. + if (virtualEventBase_ && virtualEventBase_->keepAliveCount() == 1) { + --keepAliveCount; + } + + if (loopKeepAliveActive_ && keepAliveCount == 0) { // Restore the notification queue internal flag fnRunner_->stopConsuming(); fnRunner_->startConsumingInternal(this, queue_.get()); loopKeepAliveActive_ = false; - } else if (!loopKeepAliveActive_ && !loopKeepAlive_.unique()) { + } else if (!loopKeepAliveActive_ && keepAliveCount > 0) { // Update the notification queue event to treat it as a normal // (non-internal) event. The notification queue event always remains // installed, and the main loop won't exit with it installed. @@ -447,11 +439,14 @@ void EventBase::loopForever() { { SCOPE_EXIT { applyLoopKeepAlive(); - loopForeverActive_ = false; }; - loopForeverActive_ = true; // Make sure notification queue events are treated as normal events. - auto loopKeepAlive = loopKeepAlive_; + // We can't use loopKeepAlive() here since LoopKeepAlive token can only be + // released inside a loop. + ++loopKeepAliveCount_; + SCOPE_EXIT { + --loopKeepAliveCount_; + }; ret = loop(); } @@ -470,12 +465,10 @@ void EventBase::bumpHandlingTime() { if (nothingHandledYet()) { latestLoopCnt_ = nextLoopCnt_; // set the time - startWork_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + startWork_ = std::chrono::steady_clock::now(); VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ - << " (loop) startWork_ " << startWork_; + << " (loop) startWork_ " << startWork_.time_since_epoch().count(); } } @@ -483,9 +476,7 @@ void EventBase::terminateLoopSoon() { VLOG(5) << "EventBase(): Received terminateLoopSoon() command."; // Set stop to true, so the event loop will know to exit. - // TODO: We should really use an atomic operation here with a release - // barrier. - stop_ = true; + stop_.store(true, std::memory_order_relaxed); // Call event_base_loopbreak() so that libevent will exit the next time // around the loop. @@ -509,7 +500,7 @@ void EventBase::terminateLoopSoon() { } void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) { - DCHECK(isInEventBaseThread()); + dcheckIsInEventBaseThread(); callback->cancelLoopCallback(); callback->context_ = RequestContext::saveContext(); if (runOnceCallbacks_ != nullptr && thisIteration) { @@ -520,7 +511,7 @@ void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) { } void EventBase::runInLoop(Func cob, bool thisIteration) { - DCHECK(isInEventBaseThread()); + dcheckIsInEventBaseThread(); auto wrapper = new FunctionLoopCallback(std::move(cob)); wrapper->context_ = RequestContext::saveContext(); if (runOnceCallbacks_ != nullptr && thisIteration) { @@ -530,13 +521,6 @@ void EventBase::runInLoop(Func cob, bool thisIteration) { } } -void EventBase::runAfterDrain(Func cob) { - auto callback = new FunctionLoopCallback(std::move(cob)); - std::lock_guard lg(runAfterDrainCallbacksMutex_); - callback->cancelLoopCallback(); - runAfterDrainCallbacks_.push_back(*callback); -} - void EventBase::runOnDestruction(LoopCallback* callback) { std::lock_guard lg(onDestructionCallbacksMutex_); callback->cancelLoopCallback(); @@ -544,7 +528,7 @@ void EventBase::runOnDestruction(LoopCallback* callback) { } void EventBase::runBeforeLoop(LoopCallback* callback) { - DCHECK(isInEventBaseThread()); + dcheckIsInEventBaseThread(); callback->cancelLoopCallback(); runBeforeLoopCallbacks_.push_back(*callback); } @@ -564,7 +548,6 @@ bool EventBase::runInEventBaseThread(Func fn) { if (inRunningEventBaseThread()) { runInLoop(std::move(fn)); return true; - } try { @@ -585,22 +568,16 @@ bool EventBase::runInEventBaseThreadAndWait(Func fn) { return false; } - bool ready = false; - std::mutex m; - std::condition_variable cv; - runInEventBaseThread([&] { - SCOPE_EXIT { - std::unique_lock l(m); - ready = true; - cv.notify_one(); - // We cannot release the lock before notify_one, because a spurious - // wakeup in the waiting thread may lead to cv and m going out of scope - // prematurely. - }; - fn(); + Baton<> ready; + runInEventBaseThread([&ready, fn = std::move(fn)]() mutable { + SCOPE_EXIT { + ready.post(); + }; + // A trick to force the stored functor to be executed and then destructed + // before posting the baton and waking the waiting thread. + copy(std::move(fn))(); }); - std::unique_lock l(m); - cv.wait(l, [&] { return ready; }); + ready.wait(); return true; } @@ -614,30 +591,7 @@ bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) { } } -void EventBase::runAfterDelay( - Func cob, - uint32_t milliseconds, - TimeoutManager::InternalEnum in) { - if (!tryRunAfterDelay(std::move(cob), milliseconds, in)) { - folly::throwSystemError( - "error in EventBase::runAfterDelay(), failed to schedule timeout"); - } -} - -bool EventBase::tryRunAfterDelay( - Func cob, - uint32_t milliseconds, - TimeoutManager::InternalEnum in) { - CobTimeout* timeout = new CobTimeout(this, std::move(cob), in); - if (!timeout->scheduleTimeout(milliseconds)) { - delete timeout; - return false; - } - pendingCobTimeouts_.push_back(*timeout); - return true; -} - -bool EventBase::runLoopCallbacks(bool setContext) { +bool EventBase::runLoopCallbacks() { if (!loopCallbacks_.empty()) { bumpHandlingTime(); // Swap the loopCallbacks_ list with a temporary list on our stack. @@ -655,9 +609,7 @@ bool EventBase::runLoopCallbacks(bool setContext) { while (!currentCallbacks.empty()) { LoopCallback* callback = ¤tCallbacks.front(); currentCallbacks.pop_front(); - if (setContext) { - RequestContext::setContext(callback->context_); - } + folly::RequestContextScopeGuard rctx(std::move(callback->context_)); callback->runLoopCallback(); } @@ -669,12 +621,12 @@ bool EventBase::runLoopCallbacks(bool setContext) { void EventBase::initNotificationQueue() { // Infinite size queue - queue_.reset(new NotificationQueue()); + queue_ = std::make_unique>(); // We allocate fnRunner_ separately, rather than declaring it directly // as a member of EventBase solely so that we don't need to include // NotificationQueue.h from EventBase.h - fnRunner_.reset(new FunctionRunner()); + fnRunner_ = std::make_unique(); // Mark this as an internal event, so event_base_loop() will return if // there are no other events besides this one installed. @@ -688,8 +640,9 @@ void EventBase::initNotificationQueue() { fnRunner_->startConsumingInternal(this, queue_.get()); } -void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) { - expCoeff_ = -1.0/timeInterval; +void EventBase::SmoothLoopTime::setTimeInterval( + std::chrono::microseconds timeInterval) { + expCoeff_ = -1.0 / timeInterval.count(); VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; } @@ -697,29 +650,22 @@ void EventBase::SmoothLoopTime::reset(double value) { value_ = value; } -void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) { - /* - * Position at which the busy sample is considered to be taken. - * (Allows to quickly skew our average without editing much code) - */ - enum BusySamplePosition { - RIGHT = 0, // busy sample placed at the end of the iteration - CENTER = 1, // busy sample placed at the middle point of the iteration - LEFT = 2, // busy sample placed at the beginning of the iteration - }; - - // See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average - // and D676020 for more info on this calculation. - VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ << - " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ << - " busy " << busy << " " << __PRETTY_FUNCTION__; - idle += oldBusyLeftover_ + busy; - oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2; - idle -= oldBusyLeftover_; - - double coeff = exp(idle * expCoeff_); - value_ *= coeff; - value_ += (1.0 - coeff) * busy; +void EventBase::SmoothLoopTime::addSample( + std::chrono::microseconds total, + std::chrono::microseconds busy) { + if ((buffer_time_ + total) > buffer_interval_ && buffer_cnt_ > 0) { + // See https://en.wikipedia.org/wiki/Exponential_smoothing for + // more info on this calculation. + double coeff = exp(buffer_time_.count() * expCoeff_); + value_ = + value_ * coeff + (1.0 - coeff) * (busy_buffer_.count() / buffer_cnt_); + buffer_time_ = std::chrono::microseconds{0}; + busy_buffer_ = std::chrono::microseconds{0}; + buffer_cnt_ = 0; + } + buffer_time_ += total; + busy_buffer_ += busy; + buffer_cnt_++; } bool EventBase::nothingHandledYet() const noexcept { @@ -748,11 +694,11 @@ void EventBase::detachTimeoutManager(AsyncTimeout* obj) { bool EventBase::scheduleTimeout(AsyncTimeout* obj, TimeoutManager::timeout_type timeout) { - assert(isInEventBaseThread()); + dcheckIsInEventBaseThread(); // Set up the timeval and add the event struct timeval tv; - tv.tv_sec = timeout.count() / 1000LL; - tv.tv_usec = (timeout.count() % 1000LL) * 1000LL; + tv.tv_sec = long(timeout.count() / 1000LL); + tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL); struct event* ev = obj->getEvent(); if (event_add(ev, &tv) < 0) { @@ -764,7 +710,7 @@ bool EventBase::scheduleTimeout(AsyncTimeout* obj, } void EventBase::cancelTimeout(AsyncTimeout* obj) { - assert(isInEventBaseThread()); + dcheckIsInEventBaseThread(); struct event* ev = obj->getEvent(); if (EventUtil::isEventRegistered(ev)) { event_del(ev); @@ -772,7 +718,7 @@ void EventBase::cancelTimeout(AsyncTimeout* obj) { } void EventBase::setName(const std::string& name) { - assert(isInEventBaseThread()); + dcheckIsInEventBaseThread(); name_ = name; if (isRunning()) { @@ -782,11 +728,20 @@ void EventBase::setName(const std::string& name) { } const std::string& EventBase::getName() { - assert(isInEventBaseThread()); + dcheckIsInEventBaseThread(); return name_; } const char* EventBase::getLibeventVersion() { return event_get_version(); } const char* EventBase::getLibeventMethod() { return event_get_method(); } -} // folly +VirtualEventBase& EventBase::getVirtualEventBase() { + folly::call_once(virtualEventBaseInitFlag_, [&] { + virtualEventBase_ = std::make_unique(*this); + }); + + return *virtualEventBase_; +} + +constexpr std::chrono::milliseconds EventBase::SmoothLoopTime::buffer_interval_; +} // namespace folly