X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2FEventBase.cpp;h=24883d9b96a2f387439ccaa46265ef7d3239f4c8;hp=9415e05a6c59cbeee0fd946551c101eafe29d638;hb=02cae39d6c0c8f67e788d6f3b282ad22a4240194;hpb=ce64f0f685111ac24c7a321ea56d0c3524621df1 diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index 9415e05a..24883d9b 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,50 +22,23 @@ #include #include +#include -#include +#include #include +#include #include -#include - -namespace { - -using folly::Cob; -using folly::EventBase; - -template -class FunctionLoopCallback : public EventBase::LoopCallback { - public: - explicit FunctionLoopCallback(Cob&& function) - : function_(std::move(function)) {} - - explicit FunctionLoopCallback(const Cob& function) - : function_(function) {} - - virtual void runLoopCallback() noexcept { - function_(); - delete this; - } - - private: - Callback function_; -}; - -} namespace folly { -const int kNoFD = -1; - /* * EventBase::FunctionRunner */ class EventBase::FunctionRunner - : public NotificationQueue>::Consumer { + : public NotificationQueue::Consumer { public: - void messageAvailable(std::pair&& msg) { - + void messageAvailable(Func&& msg) override { // In libevent2, internal events do not break the loop. // Most users would expect loop(), followed by runInEventBaseThread(), // to break the loop and check if it should exit or not. @@ -74,25 +47,18 @@ class EventBase::FunctionRunner // stop_ flag as well as runInLoop callbacks, etc. event_base_loopbreak(getEventBase()->evb_); - if (msg.first == nullptr && msg.second == nullptr) { + if (!msg) { // terminateLoopSoon() sends a null message just to // wake up the loop. We can ignore these messages. return; } - // If function is nullptr, just log and move on - if (!msg.first) { - LOG(ERROR) << "nullptr callback registered to be run in " - << "event base thread"; - return; - } - // The function should never throw an exception, because we have no // way of knowing what sort of error handling to perform. // // If it does throw, log a message and abort the program. try { - msg.first(msg.second); + msg(); } catch (const std::exception& ex) { LOG(ERROR) << "runInEventBaseThread() function threw a " << typeid(ex).name() << " exception: " << ex.what(); @@ -104,80 +70,98 @@ class EventBase::FunctionRunner } }; -/* - * EventBase::CobTimeout methods - */ - -void EventBase::CobTimeout::timeoutExpired() noexcept { - // For now, we just swallow any exceptions that the callback threw. - try { - cob_(); - } catch (const std::exception& ex) { - LOG(ERROR) << "EventBase::runAfterDelay() callback threw " - << typeid(ex).name() << " exception: " << ex.what(); - } catch (...) { - LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception " - << "type"; - } - - // The CobTimeout object was allocated on the heap by runAfterDelay(), - // so delete it now that the it has fired. - delete this; -} +// The interface used to libevent is not thread-safe. Calls to +// event_init() and event_base_free() directly modify an internal +// global 'current_base', so a mutex is required to protect this. +// +// event_init() should only ever be called once. Subsequent calls +// should be made to event_base_new(). We can recognise that +// event_init() has already been called by simply inspecting current_base. +static std::mutex libevent_mutex_; /* * EventBase methods */ -EventBase::EventBase() +EventBase::EventBase(bool enableTimeMeasurement) : runOnceCallbacks_(nullptr) , stop_(false) - , loopThread_(0) - , evb_(static_cast(event_init())) + , loopThread_() , queue_(nullptr) , fnRunner_(nullptr) , maxLatency_(0) - , avgLoopTime_(2000000) + , avgLoopTime_(std::chrono::milliseconds(2000000)) , maxLatencyLoopTime_(avgLoopTime_) - , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon + , enableTimeMeasurement_(enableTimeMeasurement) + , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon , latestLoopCnt_(nextLoopCnt_) , startWork_(0) , observer_(nullptr) - , observerSampleCount_(0) { + , observerSampleCount_(0) + , executionObserver_(nullptr) { + struct event ev; + { + std::lock_guard lock(libevent_mutex_); + + // The value 'current_base' (libevent 1) or + // 'event_global_current_base_' (libevent 2) is filled in by event_set(), + // allowing examination of its value without an explicit reference here. + // If ev.ev_base is NULL, then event_init() must be called, otherwise + // call event_base_new(). + event_set(&ev, 0, 0, nullptr, nullptr); + if (!ev.ev_base) { + evb_ = event_init(); + } + } + + if (ev.ev_base) { + evb_ = event_base_new(); + } + if (UNLIKELY(evb_ == nullptr)) { LOG(ERROR) << "EventBase(): Failed to init event base."; folly::throwSystemError("error in EventBase::EventBase()"); } VLOG(5) << "EventBase(): Created."; initNotificationQueue(); - RequestContext::getStaticContext(); + RequestContext::saveContext(); } // takes ownership of the event_base -EventBase::EventBase(event_base* evb) +EventBase::EventBase(event_base* evb, bool enableTimeMeasurement) : runOnceCallbacks_(nullptr) , stop_(false) - , loopThread_(0) + , loopThread_() , evb_(evb) , queue_(nullptr) , fnRunner_(nullptr) , maxLatency_(0) - , avgLoopTime_(2000000) + , avgLoopTime_(std::chrono::milliseconds(2000000)) , maxLatencyLoopTime_(avgLoopTime_) - , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon + , enableTimeMeasurement_(enableTimeMeasurement) + , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon , latestLoopCnt_(nextLoopCnt_) , startWork_(0) , observer_(nullptr) - , observerSampleCount_(0) { + , observerSampleCount_(0) + , executionObserver_(nullptr) { if (UNLIKELY(evb_ == nullptr)) { LOG(ERROR) << "EventBase(): Pass nullptr as event base."; throw std::invalid_argument("EventBase(): event base cannot be nullptr"); } initNotificationQueue(); - RequestContext::getStaticContext(); + RequestContext::saveContext(); } EventBase::~EventBase() { + // Keep looping until all keep-alive handles are released. Each keep-alive + // handle signals that some external code will still schedule some work on + // this EventBase (so it's not safe to destroy it). + while (loopKeepAliveCount() > 0) { + applyLoopKeepAlive(); + loopOnce(); + } + // Call all destruction callbacks, before we start cleaning up our state. while (!onDestructionCallbacks_.empty()) { LoopCallback* callback = &onDestructionCallbacks_.front(); @@ -185,24 +169,33 @@ EventBase::~EventBase() { callback->runLoopCallback(); } - // Delete any unfired CobTimeout objects, so that we don't leak memory - // (Note that we don't fire them. The caller is responsible for cleaning up - // its own data structures if it destroys the EventBase with unfired events - // remaining.) - while (!pendingCobTimeouts_.empty()) { - CobTimeout* timeout = &pendingCobTimeouts_.front(); - delete timeout; - } + clearCobTimeouts(); + + DCHECK_EQ(0, runBeforeLoopCallbacks_.size()); - (void) runLoopCallbacks(false); + (void)runLoopCallbacks(); + + if (!fnRunner_->consumeUntilDrained()) { + LOG(ERROR) << "~EventBase(): Unable to drain notification queue"; + } // Stop consumer before deleting NotificationQueue fnRunner_->stopConsuming(); - event_base_free(evb_); + { + std::lock_guard lock(libevent_mutex_); + event_base_free(evb_); + } + + { + std::lock_guard lock(localStorageMutex_); + for (auto storage : localStorageToDtor_) { + storage->onEventBaseDestruction(*this); + } + } VLOG(5) << "EventBase(): Destroyed."; } -int EventBase::getNotificationQueueSize() const { +size_t EventBase::getNotificationQueueSize() const { return queue_->size(); } @@ -212,9 +205,10 @@ void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) { // Set smoothing coefficient for loop load average; input is # of milliseconds // for exp(-1) decay. -void EventBase::setLoadAvgMsec(uint32_t ms) { - uint64_t us = 1000 * ms; - if (ms > 0) { +void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) { + assert(enableTimeMeasurement_); + std::chrono::microseconds us = std::chrono::milliseconds(ms); + if (ms > std::chrono::milliseconds::zero()) { maxLatencyLoopTime_.setTimeInterval(us); avgLoopTime_.setTimeInterval(us); } else { @@ -223,6 +217,7 @@ void EventBase::setLoadAvgMsec(uint32_t ms) { } void EventBase::resetLoadAvg(double value) { + assert(enableTimeMeasurement_); avgLoopTime_.reset(value); maxLatencyLoopTime_.reset(value); } @@ -252,25 +247,56 @@ bool EventBase::loopOnce(int flags) { bool EventBase::loopBody(int flags) { VLOG(5) << "EventBase(): Starting loop."; + + DCHECK(!invokingLoop_) + << "Your code just tried to loop over an event base from inside another " + << "event base loop. Since libevent is not reentrant, this leads to " + << "undefined behavior in opt builds. Please fix immediately. For the " + << "common case of an inner function that needs to do some synchronous " + << "computation on an event-base, replace getEventBase() by a new, " + << "stack-allocated EvenBase."; + invokingLoop_ = true; + SCOPE_EXIT { + invokingLoop_ = false; + }; + int res = 0; bool ranLoopCallbacks; bool blocking = !(flags & EVLOOP_NONBLOCK); bool once = (flags & EVLOOP_ONCE); + // time-measurement variables. + std::chrono::steady_clock::time_point prev; + int64_t idleStart = 0; + int64_t busy; + int64_t idle; + loopThread_.store(pthread_self(), std::memory_order_release); if (!name_.empty()) { setThreadName(name_); } - auto prev = std::chrono::steady_clock::now(); - int64_t idleStart = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); + if (enableTimeMeasurement_) { + prev = std::chrono::steady_clock::now(); + idleStart = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + } - // TODO: Read stop_ atomically with an acquire barrier. - while (!stop_) { + while (!stop_.load(std::memory_order_acquire)) { + applyLoopKeepAlive(); ++nextLoopCnt_; + // Run the before loop callbacks + LoopCallbackList callbacks; + callbacks.swap(runBeforeLoopCallbacks_); + + while(!callbacks.empty()) { + auto* item = &callbacks.front(); + callbacks.pop_front(); + item->runLoopCallback(); + } + // nobody can add loop callbacks from within this thread if // we don't have to handle anything to start with... if (blocking && loopCallbacks_.empty()) { @@ -278,44 +304,53 @@ bool EventBase::loopBody(int flags) { } else { res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK); } - ranLoopCallbacks = runLoopCallbacks(); - int64_t busy = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_; - int64_t idle = startWork_ - idleStart; + ranLoopCallbacks = runLoopCallbacks(); - avgLoopTime_.addSample(idle, busy); - maxLatencyLoopTime_.addSample(idle, busy); + if (enableTimeMeasurement_) { + busy = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count() - + startWork_; + idle = startWork_ - idleStart; + + avgLoopTime_.addSample(std::chrono::microseconds(idle), + std::chrono::microseconds(busy)); + maxLatencyLoopTime_.addSample(std::chrono::microseconds(idle), + std::chrono::microseconds(busy)); + + if (observer_) { + if (observerSampleCount_++ == observer_->getSampleRate()) { + observerSampleCount_ = 0; + observer_->loopSample(busy, idle); + } + } - if (observer_) { - if (observerSampleCount_++ == observer_->getSampleRate()) { - observerSampleCount_ = 0; - observer_->loopSample(busy, idle); + VLOG(11) << "EventBase " << this << " did not timeout " + " loop time guess: " << busy + idle << + " idle time: " << idle << + " busy time: " << busy << + " avgLoopTime: " << avgLoopTime_.get() << + " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() << + " maxLatency_: " << maxLatency_ << + " notificationQueueSize: " << getNotificationQueueSize() << + " nothingHandledYet(): "<< nothingHandledYet(); + + // see if our average loop time has exceeded our limit + if ((maxLatency_ > 0) && + (maxLatencyLoopTime_.get() > double(maxLatency_))) { + maxLatencyCob_(); + // back off temporarily -- don't keep spamming maxLatencyCob_ + // if we're only a bit over the limit + maxLatencyLoopTime_.dampen(0.9); } - } - VLOG(11) << "EventBase " << this << " did not timeout " - " loop time guess: " << busy + idle << - " idle time: " << idle << - " busy time: " << busy << - " avgLoopTime: " << avgLoopTime_.get() << - " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() << - " maxLatency_: " << maxLatency_ << - " nothingHandledYet(): "<< nothingHandledYet(); - - // see if our average loop time has exceeded our limit - if ((maxLatency_ > 0) && - (maxLatencyLoopTime_.get() > double(maxLatency_))) { - maxLatencyCob_(); - // back off temporarily -- don't keep spamming maxLatencyCob_ - // if we're only a bit over the limit - maxLatencyLoopTime_.dampen(0.9); + // Our loop run did real work; reset the idle timer + idleStart = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + } else { + VLOG(11) << "EventBase " << this << " did not timeout"; } - // Our loop run did real work; reset the idle timer - idleStart = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); - // If the event loop indicate that there were no more events, and // we also didn't have any loop callbacks to run, there is nothing left to // do. @@ -330,8 +365,10 @@ bool EventBase::loopBody(int flags) { } } - VLOG(5) << "EventBase " << this << " loop time: " << - getTimeDelta(&prev).count(); + if (enableTimeMeasurement_) { + VLOG(5) << "EventBase " << this << " loop time: " << + getTimeDelta(&prev).count(); + } if (once) { break; @@ -350,53 +387,80 @@ bool EventBase::loopBody(int flags) { return false; } - loopThread_.store(0, std::memory_order_release); + loopThread_.store({}, std::memory_order_release); VLOG(5) << "EventBase(): Done with loop."; return true; } -void EventBase::loopForever() { - // Update the notification queue event to treat it as a normal (non-internal) - // event. The notification queue event always remains installed, and the main - // loop won't exit with it installed. - fnRunner_->stopConsuming(); - fnRunner_->startConsuming(this, queue_.get()); +ssize_t EventBase::loopKeepAliveCount() { + if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) { + loopKeepAliveCount_ += + loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed); + } + DCHECK_GE(loopKeepAliveCount_, 0); + return loopKeepAliveCount_; +} - bool ret = loop(); +void EventBase::applyLoopKeepAlive() { + if (loopKeepAliveActive_ && loopKeepAliveCount() == 0) { + // Restore the notification queue internal flag + fnRunner_->stopConsuming(); + fnRunner_->startConsumingInternal(this, queue_.get()); + loopKeepAliveActive_ = false; + } else if (!loopKeepAliveActive_ && loopKeepAliveCount() > 0) { + // Update the notification queue event to treat it as a normal + // (non-internal) event. The notification queue event always remains + // installed, and the main loop won't exit with it installed. + fnRunner_->stopConsuming(); + fnRunner_->startConsuming(this, queue_.get()); + loopKeepAliveActive_ = true; + } +} - // Restore the notification queue internal flag - fnRunner_->stopConsuming(); - fnRunner_->startConsumingInternal(this, queue_.get()); +void EventBase::loopForever() { + bool ret; + { + SCOPE_EXIT { + applyLoopKeepAlive(); + }; + // Make sure notification queue events are treated as normal events. + // We can't use loopKeepAlive() here since LoopKeepAlive token can only be + // released inside a loop. + ++loopKeepAliveCount_; + SCOPE_EXIT { + --loopKeepAliveCount_; + }; + ret = loop(); + } if (!ret) { folly::throwSystemError("error in EventBase::loopForever()"); } } -bool EventBase::bumpHandlingTime() { +void EventBase::bumpHandlingTime() { + if (!enableTimeMeasurement_) { + return; + } + VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ << " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_; - if(nothingHandledYet()) { + if (nothingHandledYet()) { latestLoopCnt_ = nextLoopCnt_; // set the time startWork_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()).count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); - VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ << - " (loop) startWork_ " << startWork_; - return true; + VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ + << " (loop) startWork_ " << startWork_; } - return false; } void EventBase::terminateLoopSoon() { VLOG(5) << "EventBase(): Received terminateLoopSoon() command."; - if (!isRunning()) { - return; - } - // Set stop to true, so the event loop will know to exit. // TODO: We should really use an atomic operation here with a release // barrier. @@ -416,7 +480,7 @@ void EventBase::terminateLoopSoon() { // this likely means the EventBase already has lots of events waiting // anyway. try { - queue_->putMessage(std::make_pair(nullptr, nullptr)); + queue_->putMessage(nullptr); } catch (...) { // We don't care if putMessage() fails. This likely means // the EventBase already has lots of events waiting anyway. @@ -434,9 +498,9 @@ void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) { } } -void EventBase::runInLoop(const Cob& cob, bool thisIteration) { +void EventBase::runInLoop(Func cob, bool thisIteration) { DCHECK(isInEventBaseThread()); - auto wrapper = new FunctionLoopCallback(cob); + auto wrapper = new FunctionLoopCallback(std::move(cob)); wrapper->context_ = RequestContext::saveContext(); if (runOnceCallbacks_ != nullptr && thisIteration) { runOnceCallbacks_->push_back(*wrapper); @@ -445,24 +509,19 @@ void EventBase::runInLoop(const Cob& cob, bool thisIteration) { } } -void EventBase::runInLoop(Cob&& cob, bool thisIteration) { - DCHECK(isInEventBaseThread()); - auto wrapper = new FunctionLoopCallback(std::move(cob)); - wrapper->context_ = RequestContext::saveContext(); - if (runOnceCallbacks_ != nullptr && thisIteration) { - runOnceCallbacks_->push_back(*wrapper); - } else { - loopCallbacks_.push_back(*wrapper); - } +void EventBase::runOnDestruction(LoopCallback* callback) { + std::lock_guard lg(onDestructionCallbacksMutex_); + callback->cancelLoopCallback(); + onDestructionCallbacks_.push_back(*callback); } -void EventBase::runOnDestruction(LoopCallback* callback) { +void EventBase::runBeforeLoop(LoopCallback* callback) { DCHECK(isInEventBaseThread()); callback->cancelLoopCallback(); - onDestructionCallbacks_.push_back(*callback); + runBeforeLoopCallbacks_.push_back(*callback); } -bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) { +bool EventBase::runInEventBaseThread(Func fn) { // Send the message. // It will be received by the FunctionRunner in the EventBase's thread. @@ -475,62 +534,59 @@ bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) { // Short-circuit if we are already in our event base if (inRunningEventBaseThread()) { - runInLoop(new RunInLoopCallback(fn, arg)); + runInLoop(std::move(fn)); return true; } try { - queue_->putMessage(std::make_pair(fn, arg)); + queue_->putMessage(std::move(fn)); } catch (const std::exception& ex) { LOG(ERROR) << "EventBase " << this << ": failed to schedule function " - << fn << "for EventBase thread: " << ex.what(); + << "for EventBase thread: " << ex.what(); return false; } return true; } -bool EventBase::runInEventBaseThread(const Cob& fn) { - // Short-circuit if we are already in our event base +bool EventBase::runInEventBaseThreadAndWait(FuncRef fn) { if (inRunningEventBaseThread()) { - runInLoop(fn); - return true; - } - - Cob* fnCopy; - // Allocate a copy of the function so we can pass it to the other thread - // The other thread will delete this copy once the function has been run - try { - fnCopy = new Cob(fn); - } catch (const std::bad_alloc& ex) { - LOG(ERROR) << "failed to allocate tr::function copy " - << "for runInEventBaseThread()"; + LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not " + << "allowed"; return false; } - if (!runInEventBaseThread(&EventBase::runFunctionPtr, fnCopy)) { - delete fnCopy; - return false; - } + bool ready = false; + std::mutex m; + std::condition_variable cv; + runInEventBaseThread([&] { + SCOPE_EXIT { + std::unique_lock l(m); + ready = true; + cv.notify_one(); + // We cannot release the lock before notify_one, because a spurious + // wakeup in the waiting thread may lead to cv and m going out of scope + // prematurely. + }; + fn(); + }); + std::unique_lock l(m); + cv.wait(l, [&] { return ready; }); return true; } -bool EventBase::runAfterDelay(const Cob& cob, - int milliseconds, - TimeoutManager::InternalEnum in) { - CobTimeout* timeout = new CobTimeout(this, cob, in); - if (!timeout->scheduleTimeout(milliseconds)) { - delete timeout; - return false; +bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn) { + if (isInEventBaseThread()) { + fn(); + return true; + } else { + return runInEventBaseThreadAndWait(std::move(fn)); } - - pendingCobTimeouts_.push_back(*timeout); - return true; } -bool EventBase::runLoopCallbacks(bool setContext) { +bool EventBase::runLoopCallbacks() { if (!loopCallbacks_.empty()) { bumpHandlingTime(); // Swap the loopCallbacks_ list with a temporary list on our stack. @@ -548,9 +604,7 @@ bool EventBase::runLoopCallbacks(bool setContext) { while (!currentCallbacks.empty()) { LoopCallback* callback = ¤tCallbacks.front(); currentCallbacks.pop_front(); - if (setContext) { - RequestContext::setContext(callback->context_); - } + folly::RequestContextScopeGuard rctx(callback->context_); callback->runLoopCallback(); } @@ -562,7 +616,7 @@ bool EventBase::runLoopCallbacks(bool setContext) { void EventBase::initNotificationQueue() { // Infinite size queue - queue_.reset(new NotificationQueue>()); + queue_.reset(new NotificationQueue()); // We allocate fnRunner_ separately, rather than declaring it directly // as a member of EventBase solely so that we don't need to include @@ -581,8 +635,9 @@ void EventBase::initNotificationQueue() { fnRunner_->startConsumingInternal(this, queue_.get()); } -void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) { - expCoeff_ = -1.0/timeInterval; +void EventBase::SmoothLoopTime::setTimeInterval( + std::chrono::microseconds timeInterval) { + expCoeff_ = -1.0 / timeInterval.count(); VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; } @@ -590,65 +645,39 @@ void EventBase::SmoothLoopTime::reset(double value) { value_ = value; } -void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) { - /* - * Position at which the busy sample is considered to be taken. - * (Allows to quickly skew our average without editing much code) - */ - enum BusySamplePosition { - RIGHT = 0, // busy sample placed at the end of the iteration - CENTER = 1, // busy sample placed at the middle point of the iteration - LEFT = 2, // busy sample placed at the beginning of the iteration - }; - - VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ << - " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ << - " busy " << busy << " " << __PRETTY_FUNCTION__; +void EventBase::SmoothLoopTime::addSample( + std::chrono::microseconds idle, + std::chrono::microseconds busy) { + /* + * Position at which the busy sample is considered to be taken. + * (Allows to quickly skew our average without editing much code) + */ + enum BusySamplePosition { + RIGHT = 0, // busy sample placed at the end of the iteration + CENTER = 1, // busy sample placed at the middle point of the iteration + LEFT = 2, // busy sample placed at the beginning of the iteration + }; + + // See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average + // and D676020 for more info on this calculation. + VLOG(11) << "idle " << idle.count() << " oldBusyLeftover_ " + << oldBusyLeftover_.count() << " idle + oldBusyLeftover_ " + << (idle + oldBusyLeftover_).count() << " busy " << busy.count() + << " " << __PRETTY_FUNCTION__; idle += oldBusyLeftover_ + busy; oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2; idle -= oldBusyLeftover_; - double coeff = exp(idle * expCoeff_); + double coeff = exp(idle.count() * expCoeff_); value_ *= coeff; - value_ += (1.0 - coeff) * busy; + value_ += (1.0 - coeff) * busy.count(); } -bool EventBase::nothingHandledYet() { +bool EventBase::nothingHandledYet() const noexcept { VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_; return (nextLoopCnt_ != latestLoopCnt_); } -/* static */ -void EventBase::runFunctionPtr(Cob* fn) { - // The function should never throw an exception, because we have no - // way of knowing what sort of error handling to perform. - // - // If it does throw, log a message and abort the program. - try { - (*fn)(); - } catch (const std::exception &ex) { - LOG(ERROR) << "runInEventBaseThread() std::function threw a " - << typeid(ex).name() << " exception: " << ex.what(); - abort(); - } catch (...) { - LOG(ERROR) << "runInEventBaseThread() std::function threw an exception"; - abort(); - } - - // The function object was allocated by runInEventBaseThread(). - // Delete it once it has been run. - delete fn; -} - -EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg) - : fn_(fn) - , arg_(arg) {} - -void EventBase::RunInLoopCallback::runLoopCallback() noexcept { - fn_(arg_); - delete this; -} - void EventBase::attachTimeoutManager(AsyncTimeout* obj, InternalEnum internal) { @@ -658,7 +687,7 @@ void EventBase::attachTimeoutManager(AsyncTimeout* obj, event_base_set(getLibeventBase(), ev); if (internal == AsyncTimeout::InternalEnum::INTERNAL) { // Set the EVLIST_INTERNAL flag - ev->ev_flags |= EVLIST_INTERNAL; + event_ref_flags(ev) |= EVLIST_INTERNAL; } } @@ -669,12 +698,12 @@ void EventBase::detachTimeoutManager(AsyncTimeout* obj) { } bool EventBase::scheduleTimeout(AsyncTimeout* obj, - std::chrono::milliseconds timeout) { + TimeoutManager::timeout_type timeout) { assert(isInEventBaseThread()); // Set up the timeval and add the event struct timeval tv; - tv.tv_sec = timeout.count() / 1000LL; - tv.tv_usec = (timeout.count() % 1000LL) * 1000LL; + tv.tv_sec = long(timeout.count() / 1000LL); + tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL); struct event* ev = obj->getEvent(); if (event_add(ev, &tv) < 0) { @@ -708,4 +737,7 @@ const std::string& EventBase::getName() { return name_; } +const char* EventBase::getLibeventVersion() { return event_get_version(); } +const char* EventBase::getLibeventMethod() { return event_get_method(); } + } // folly