X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2FEventBase.cpp;h=b8a1c25675b6c9c408f7fc1260d3bbf0621916be;hb=7f22ad99b5c4583d8f579911b8907c7758a4151e;hp=00aa9d80d0a0b7ea63ba31453de1afda67c4bb1f;hpb=1219e49411f20b15f0d577375891d5d20a20e254;p=folly.git diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index 00aa9d80..b8a1c256 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2015 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +20,13 @@ #include -#include #include #include #include +#include #include +#include #include #include @@ -43,7 +44,7 @@ class FunctionLoopCallback : public EventBase::LoopCallback { explicit FunctionLoopCallback(const Cob& function) : function_(function) {} - virtual void runLoopCallback() noexcept { + void runLoopCallback() noexcept override { function_(); delete this; } @@ -65,7 +66,7 @@ const int kNoFD = -1; class EventBase::FunctionRunner : public NotificationQueue>::Consumer { public: - void messageAvailable(std::pair&& msg) { + void messageAvailable(std::pair&& msg) override { // In libevent2, internal events do not break the loop. // Most users would expect loop(), followed by runInEventBaseThread(), @@ -140,7 +141,7 @@ static std::mutex libevent_mutex_; * EventBase methods */ -EventBase::EventBase() +EventBase::EventBase(bool enableTimeMeasurement) : runOnceCallbacks_(nullptr) , stop_(false) , loopThread_(0) @@ -149,11 +150,13 @@ EventBase::EventBase() , maxLatency_(0) , avgLoopTime_(2000000) , maxLatencyLoopTime_(avgLoopTime_) + , enableTimeMeasurement_(enableTimeMeasurement) , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon , latestLoopCnt_(nextLoopCnt_) , startWork_(0) , observer_(nullptr) - , observerSampleCount_(0) { + , observerSampleCount_(0) + , executionObserver_(nullptr) { { std::lock_guard lock(libevent_mutex_); @@ -172,11 +175,11 @@ EventBase::EventBase() } VLOG(5) << "EventBase(): Created."; initNotificationQueue(); - RequestContext::getStaticContext(); + RequestContext::saveContext(); } // takes ownership of the event_base -EventBase::EventBase(event_base* evb) +EventBase::EventBase(event_base* evb, bool enableTimeMeasurement) : runOnceCallbacks_(nullptr) , stop_(false) , loopThread_(0) @@ -186,17 +189,19 @@ EventBase::EventBase(event_base* evb) , maxLatency_(0) , avgLoopTime_(2000000) , maxLatencyLoopTime_(avgLoopTime_) + , enableTimeMeasurement_(enableTimeMeasurement) , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon , latestLoopCnt_(nextLoopCnt_) , startWork_(0) , observer_(nullptr) - , observerSampleCount_(0) { + , observerSampleCount_(0) + , executionObserver_(nullptr) { if (UNLIKELY(evb_ == nullptr)) { LOG(ERROR) << "EventBase(): Pass nullptr as event base."; throw std::invalid_argument("EventBase(): event base cannot be nullptr"); } initNotificationQueue(); - RequestContext::getStaticContext(); + RequestContext::saveContext(); } EventBase::~EventBase() { @@ -246,6 +251,7 @@ void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) { // Set smoothing coefficient for loop load average; input is # of milliseconds // for exp(-1) decay. void EventBase::setLoadAvgMsec(uint32_t ms) { + assert(enableTimeMeasurement_); uint64_t us = 1000 * ms; if (ms > 0) { maxLatencyLoopTime_.setTimeInterval(us); @@ -256,6 +262,7 @@ void EventBase::setLoadAvgMsec(uint32_t ms) { } void EventBase::resetLoadAvg(double value) { + assert(enableTimeMeasurement_); avgLoopTime_.reset(value); maxLatencyLoopTime_.reset(value); } @@ -290,15 +297,23 @@ bool EventBase::loopBody(int flags) { bool blocking = !(flags & EVLOOP_NONBLOCK); bool once = (flags & EVLOOP_ONCE); + // time-measurement variables. + std::chrono::steady_clock::time_point prev; + int64_t idleStart; + int64_t busy; + int64_t idle; + loopThread_.store(pthread_self(), std::memory_order_release); if (!name_.empty()) { setThreadName(name_); } - auto prev = std::chrono::steady_clock::now(); - int64_t idleStart = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); + if (enableTimeMeasurement_) { + prev = std::chrono::steady_clock::now(); + idleStart = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + } // TODO: Read stop_ atomically with an acquire barrier. while (!stop_) { @@ -324,41 +339,48 @@ bool EventBase::loopBody(int flags) { ranLoopCallbacks = runLoopCallbacks(); - int64_t busy = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_; - int64_t idle = startWork_ - idleStart; + if (enableTimeMeasurement_) { + busy = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count() - + startWork_; + idle = startWork_ - idleStart; - avgLoopTime_.addSample(idle, busy); - maxLatencyLoopTime_.addSample(idle, busy); + avgLoopTime_.addSample(idle, busy); + maxLatencyLoopTime_.addSample(idle, busy); - if (observer_) { - if (observerSampleCount_++ == observer_->getSampleRate()) { - observerSampleCount_ = 0; - observer_->loopSample(busy, idle); + if (observer_) { + if (observerSampleCount_++ == observer_->getSampleRate()) { + observerSampleCount_ = 0; + observer_->loopSample(busy, idle); + } } - } - VLOG(11) << "EventBase " << this << " did not timeout " - " loop time guess: " << busy + idle << - " idle time: " << idle << - " busy time: " << busy << - " avgLoopTime: " << avgLoopTime_.get() << - " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() << - " maxLatency_: " << maxLatency_ << - " nothingHandledYet(): "<< nothingHandledYet(); - - // see if our average loop time has exceeded our limit - if ((maxLatency_ > 0) && - (maxLatencyLoopTime_.get() > double(maxLatency_))) { - maxLatencyCob_(); - // back off temporarily -- don't keep spamming maxLatencyCob_ - // if we're only a bit over the limit - maxLatencyLoopTime_.dampen(0.9); - } + VLOG(11) << "EventBase " << this << " did not timeout " + " loop time guess: " << busy + idle << + " idle time: " << idle << + " busy time: " << busy << + " avgLoopTime: " << avgLoopTime_.get() << + " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() << + " maxLatency_: " << maxLatency_ << + " nothingHandledYet(): "<< nothingHandledYet(); + + // see if our average loop time has exceeded our limit + if ((maxLatency_ > 0) && + (maxLatencyLoopTime_.get() > double(maxLatency_))) { + maxLatencyCob_(); + // back off temporarily -- don't keep spamming maxLatencyCob_ + // if we're only a bit over the limit + maxLatencyLoopTime_.dampen(0.9); + } - // Our loop run did real work; reset the idle timer - idleStart = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); + // Our loop run did real work; reset the idle timer + idleStart = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + } else { + VLOG(11) << "EventBase " << this << " did not timeout " + " time measurement is disabled " + " nothingHandledYet(): "<< nothingHandledYet(); + } // If the event loop indicate that there were no more events, and // we also didn't have any loop callbacks to run, there is nothing left to @@ -374,8 +396,10 @@ bool EventBase::loopBody(int flags) { } } - VLOG(5) << "EventBase " << this << " loop time: " << - getTimeDelta(&prev).count(); + if (enableTimeMeasurement_) { + VLOG(5) << "EventBase " << this << " loop time: " << + getTimeDelta(&prev).count(); + } if (once) { break; @@ -497,7 +521,7 @@ void EventBase::runInLoop(Cob&& cob, bool thisIteration) { } void EventBase::runOnDestruction(LoopCallback* callback) { - DCHECK(isInEventBaseThread()); + std::lock_guard lg(onDestructionCallbacksMutex_); callback->cancelLoopCallback(); onDestructionCallbacks_.push_back(*callback); } @@ -563,49 +587,59 @@ bool EventBase::runInEventBaseThread(const Cob& fn) { return true; } -bool EventBase::runInEventBaseThreadAndWait(void (*fn)(void*), void* arg) { +bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) { if (inRunningEventBaseThread()) { LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not " << "allowed"; return false; } - Baton<> ready; + bool ready = false; + std::mutex m; + std::condition_variable cv; runInEventBaseThread([&] { - fn(arg); - ready.post(); + SCOPE_EXIT { + std::unique_lock l(m); + ready = true; + cv.notify_one(); + // We cannot release the lock before notify_one, because a spurious + // wakeup in the waiting thread may lead to cv and m going out of scope + // prematurely. + }; + fn(); }); - ready.wait(); + std::unique_lock l(m); + cv.wait(l, [&] { return ready; }); return true; } -bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) { - if (inRunningEventBaseThread()) { - LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not " - << "allowed"; - return false; +bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn) { + if (isInEventBaseThread()) { + fn(); + return true; + } else { + return runInEventBaseThreadAndWait(fn); } +} - Baton<> ready; - runInEventBaseThread([&] { - fn(); - ready.post(); - }); - ready.wait(); - - return true; +void EventBase::runAfterDelay(const Cob& cob, + int milliseconds, + TimeoutManager::InternalEnum in) { + if (!tryRunAfterDelay(cob, milliseconds, in)) { + folly::throwSystemError( + "error in EventBase::runAfterDelay(), failed to schedule timeout"); + } } -bool EventBase::runAfterDelay(const Cob& cob, - int milliseconds, - TimeoutManager::InternalEnum in) { +bool EventBase::tryRunAfterDelay(const Cob& cob, + int milliseconds, + TimeoutManager::InternalEnum in) { CobTimeout* timeout = new CobTimeout(this, cob, in); if (!timeout->scheduleTimeout(milliseconds)) { delete timeout; return false; } - pendingCobTimeouts_.push_back(*timeout); return true; } @@ -751,7 +785,7 @@ void EventBase::detachTimeoutManager(AsyncTimeout* obj) { } bool EventBase::scheduleTimeout(AsyncTimeout* obj, - std::chrono::milliseconds timeout) { + TimeoutManager::timeout_type timeout) { assert(isInEventBaseThread()); // Set up the timeval and add the event struct timeval tv;