/*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
explicit FunctionLoopCallback(const Cob& function)
: function_(function) {}
- virtual void runLoopCallback() noexcept {
+ void runLoopCallback() noexcept override {
function_();
delete this;
}
class EventBase::FunctionRunner
: public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
public:
- void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
+ void messageAvailable(std::pair<void (*)(void*), void*>&& msg) override {
// In libevent2, internal events do not break the loop.
// Most users would expect loop(), followed by runInEventBaseThread(),
* EventBase methods
*/
-EventBase::EventBase()
+EventBase::EventBase(bool enableTimeMeasurement)
: runOnceCallbacks_(nullptr)
, stop_(false)
, loopThread_(0)
, maxLatency_(0)
, avgLoopTime_(2000000)
, maxLatencyLoopTime_(avgLoopTime_)
+ , enableTimeMeasurement_(enableTimeMeasurement)
, nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon
, latestLoopCnt_(nextLoopCnt_)
, startWork_(0)
, observer_(nullptr)
- , observerSampleCount_(0) {
+ , observerSampleCount_(0)
+ , executionObserver_(nullptr) {
{
std::lock_guard<std::mutex> lock(libevent_mutex_);
}
VLOG(5) << "EventBase(): Created.";
initNotificationQueue();
- RequestContext::getStaticContext();
+ RequestContext::saveContext();
}
// takes ownership of the event_base
-EventBase::EventBase(event_base* evb)
+EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
: runOnceCallbacks_(nullptr)
, stop_(false)
, loopThread_(0)
, maxLatency_(0)
, avgLoopTime_(2000000)
, maxLatencyLoopTime_(avgLoopTime_)
+ , enableTimeMeasurement_(enableTimeMeasurement)
, nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon
, latestLoopCnt_(nextLoopCnt_)
, startWork_(0)
, observer_(nullptr)
- , observerSampleCount_(0) {
+ , observerSampleCount_(0)
+ , executionObserver_(nullptr) {
if (UNLIKELY(evb_ == nullptr)) {
LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
throw std::invalid_argument("EventBase(): event base cannot be nullptr");
}
initNotificationQueue();
- RequestContext::getStaticContext();
+ RequestContext::saveContext();
}
EventBase::~EventBase() {
// Set smoothing coefficient for loop load average; input is # of milliseconds
// for exp(-1) decay.
void EventBase::setLoadAvgMsec(uint32_t ms) {
+ assert(enableTimeMeasurement_);
uint64_t us = 1000 * ms;
if (ms > 0) {
maxLatencyLoopTime_.setTimeInterval(us);
}
void EventBase::resetLoadAvg(double value) {
+ assert(enableTimeMeasurement_);
avgLoopTime_.reset(value);
maxLatencyLoopTime_.reset(value);
}
bool blocking = !(flags & EVLOOP_NONBLOCK);
bool once = (flags & EVLOOP_ONCE);
+ // time-measurement variables.
+ std::chrono::steady_clock::time_point prev;
+ int64_t idleStart;
+ int64_t busy;
+ int64_t idle;
+
loopThread_.store(pthread_self(), std::memory_order_release);
if (!name_.empty()) {
setThreadName(name_);
}
- auto prev = std::chrono::steady_clock::now();
- int64_t idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
- std::chrono::steady_clock::now().time_since_epoch()).count();
+ if (enableTimeMeasurement_) {
+ prev = std::chrono::steady_clock::now();
+ idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+ }
// TODO: Read stop_ atomically with an acquire barrier.
while (!stop_) {
ranLoopCallbacks = runLoopCallbacks();
- int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
- std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_;
- int64_t idle = startWork_ - idleStart;
+ if (enableTimeMeasurement_) {
+ busy = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count() -
+ startWork_;
+ idle = startWork_ - idleStart;
- avgLoopTime_.addSample(idle, busy);
- maxLatencyLoopTime_.addSample(idle, busy);
+ avgLoopTime_.addSample(idle, busy);
+ maxLatencyLoopTime_.addSample(idle, busy);
- if (observer_) {
- if (observerSampleCount_++ == observer_->getSampleRate()) {
- observerSampleCount_ = 0;
- observer_->loopSample(busy, idle);
+ if (observer_) {
+ if (observerSampleCount_++ == observer_->getSampleRate()) {
+ observerSampleCount_ = 0;
+ observer_->loopSample(busy, idle);
+ }
}
- }
- VLOG(11) << "EventBase " << this << " did not timeout "
- " loop time guess: " << busy + idle <<
- " idle time: " << idle <<
- " busy time: " << busy <<
- " avgLoopTime: " << avgLoopTime_.get() <<
- " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
- " maxLatency_: " << maxLatency_ <<
- " nothingHandledYet(): "<< nothingHandledYet();
-
- // see if our average loop time has exceeded our limit
- if ((maxLatency_ > 0) &&
- (maxLatencyLoopTime_.get() > double(maxLatency_))) {
- maxLatencyCob_();
- // back off temporarily -- don't keep spamming maxLatencyCob_
- // if we're only a bit over the limit
- maxLatencyLoopTime_.dampen(0.9);
- }
+ VLOG(11) << "EventBase " << this << " did not timeout "
+ " loop time guess: " << busy + idle <<
+ " idle time: " << idle <<
+ " busy time: " << busy <<
+ " avgLoopTime: " << avgLoopTime_.get() <<
+ " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
+ " maxLatency_: " << maxLatency_ <<
+ " nothingHandledYet(): "<< nothingHandledYet();
+
+ // see if our average loop time has exceeded our limit
+ if ((maxLatency_ > 0) &&
+ (maxLatencyLoopTime_.get() > double(maxLatency_))) {
+ maxLatencyCob_();
+ // back off temporarily -- don't keep spamming maxLatencyCob_
+ // if we're only a bit over the limit
+ maxLatencyLoopTime_.dampen(0.9);
+ }
- // Our loop run did real work; reset the idle timer
- idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
- std::chrono::steady_clock::now().time_since_epoch()).count();
+ // Our loop run did real work; reset the idle timer
+ idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+ } else {
+ VLOG(11) << "EventBase " << this << " did not timeout "
+ " time measurement is disabled "
+ " nothingHandledYet(): "<< nothingHandledYet();
+ }
// If the event loop indicate that there were no more events, and
// we also didn't have any loop callbacks to run, there is nothing left to
}
}
- VLOG(5) << "EventBase " << this << " loop time: " <<
- getTimeDelta(&prev).count();
+ if (enableTimeMeasurement_) {
+ VLOG(5) << "EventBase " << this << " loop time: " <<
+ getTimeDelta(&prev).count();
+ }
if (once) {
break;
}
void EventBase::runOnDestruction(LoopCallback* callback) {
- DCHECK(isInEventBaseThread());
+ std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
callback->cancelLoopCallback();
onDestructionCallbacks_.push_back(*callback);
}
SCOPE_EXIT {
std::unique_lock<std::mutex> l(m);
ready = true;
- l.unlock();
cv.notify_one();
+ // We cannot release the lock before notify_one, because a spurious
+ // wakeup in the waiting thread may lead to cv and m going out of scope
+ // prematurely.
};
fn();
});
return true;
}
-bool EventBase::runAfterDelay(const Cob& cob,
- int milliseconds,
- TimeoutManager::InternalEnum in) {
+bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn) {
+ if (isInEventBaseThread()) {
+ fn();
+ return true;
+ } else {
+ return runInEventBaseThreadAndWait(fn);
+ }
+}
+
+void EventBase::runAfterDelay(const Cob& cob,
+ int milliseconds,
+ TimeoutManager::InternalEnum in) {
+ if (!tryRunAfterDelay(cob, milliseconds, in)) {
+ folly::throwSystemError(
+ "error in EventBase::runAfterDelay(), failed to schedule timeout");
+ }
+}
+
+bool EventBase::tryRunAfterDelay(const Cob& cob,
+ int milliseconds,
+ TimeoutManager::InternalEnum in) {
CobTimeout* timeout = new CobTimeout(this, cob, in);
if (!timeout->scheduleTimeout(milliseconds)) {
delete timeout;
return false;
}
-
pendingCobTimeouts_.push_back(*timeout);
return true;
}
}
bool EventBase::scheduleTimeout(AsyncTimeout* obj,
- std::chrono::milliseconds timeout) {
+ TimeoutManager::timeout_type timeout) {
assert(isInEventBaseThread());
// Set up the timeval and add the event
struct timeval tv;