X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2FEventBase.cpp;h=24883d9b96a2f387439ccaa46265ef7d3239f4c8;hp=2680d96cd90fb5d4368753fa28689f726cebff1e;hb=02cae39d6c0c8f67e788d6f3b282ad22a4240194;hpb=35fcff936a0ba58986269fb05689843f99e89eb5 diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index 2680d96c..24883d9b 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,25 +29,6 @@ #include #include -namespace { - -using folly::EventBase; - -class FunctionLoopCallback : public EventBase::LoopCallback { - public: - explicit FunctionLoopCallback(EventBase::Func&& function) - : function_(std::move(function)) {} - - void runLoopCallback() noexcept override { - function_(); - delete this; - } - - private: - EventBase::Func function_; -}; -} - namespace folly { /* @@ -89,28 +70,6 @@ class EventBase::FunctionRunner } }; -/* - * EventBase::CobTimeout methods - */ - -void EventBase::CobTimeout::timeoutExpired() noexcept { - // For now, we just swallow any exceptions that the callback threw. - try { - cob_(); - } catch (const std::exception& ex) { - LOG(ERROR) << "EventBase::runAfterDelay() callback threw " - << typeid(ex).name() << " exception: " << ex.what(); - } catch (...) { - LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception " - << "type"; - } - - // The CobTimeout object was allocated on the heap by runAfterDelay(), - // so delete it now that the it has fired. - delete this; -} - - // The interface used to libevent is not thread-safe. Calls to // event_init() and event_base_free() directly modify an internal // global 'current_base', so a mutex is required to protect this. @@ -131,10 +90,10 @@ EventBase::EventBase(bool enableTimeMeasurement) , queue_(nullptr) , fnRunner_(nullptr) , maxLatency_(0) - , avgLoopTime_(2000000) + , avgLoopTime_(std::chrono::milliseconds(2000000)) , maxLatencyLoopTime_(avgLoopTime_) , enableTimeMeasurement_(enableTimeMeasurement) - , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon + , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon , latestLoopCnt_(nextLoopCnt_) , startWork_(0) , observer_(nullptr) @@ -177,10 +136,10 @@ EventBase::EventBase(event_base* evb, bool enableTimeMeasurement) , queue_(nullptr) , fnRunner_(nullptr) , maxLatency_(0) - , avgLoopTime_(2000000) + , avgLoopTime_(std::chrono::milliseconds(2000000)) , maxLatencyLoopTime_(avgLoopTime_) , enableTimeMeasurement_(enableTimeMeasurement) - , nextLoopCnt_(-40) // Early wrap-around so bugs will manifest soon + , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon , latestLoopCnt_(nextLoopCnt_) , startWork_(0) , observer_(nullptr) @@ -198,7 +157,7 @@ EventBase::~EventBase() { // Keep looping until all keep-alive handles are released. Each keep-alive // handle signals that some external code will still schedule some work on // this EventBase (so it's not safe to destroy it). - while (!loopKeepAlive_.unique()) { + while (loopKeepAliveCount() > 0) { applyLoopKeepAlive(); loopOnce(); } @@ -210,20 +169,11 @@ EventBase::~EventBase() { callback->runLoopCallback(); } - // Delete any unfired callback objects, so that we don't leak memory - // (Note that we don't fire them. The caller is responsible for cleaning up - // its own data structures if it destroys the EventBase with unfired events - // remaining.) - while (!pendingCobTimeouts_.empty()) { - CobTimeout* timeout = &pendingCobTimeouts_.front(); - delete timeout; - } + clearCobTimeouts(); - while (!runBeforeLoopCallbacks_.empty()) { - delete &runBeforeLoopCallbacks_.front(); - } + DCHECK_EQ(0, runBeforeLoopCallbacks_.size()); - (void) runLoopCallbacks(false); + (void)runLoopCallbacks(); if (!fnRunner_->consumeUntilDrained()) { LOG(ERROR) << "~EventBase(): Unable to drain notification queue"; @@ -236,12 +186,6 @@ EventBase::~EventBase() { event_base_free(evb_); } - while (!runAfterDrainCallbacks_.empty()) { - LoopCallback* callback = &runAfterDrainCallbacks_.front(); - runAfterDrainCallbacks_.pop_front(); - callback->runLoopCallback(); - } - { std::lock_guard lock(localStorageMutex_); for (auto storage : localStorageToDtor_) { @@ -251,7 +195,7 @@ EventBase::~EventBase() { VLOG(5) << "EventBase(): Destroyed."; } -int EventBase::getNotificationQueueSize() const { +size_t EventBase::getNotificationQueueSize() const { return queue_->size(); } @@ -261,10 +205,10 @@ void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) { // Set smoothing coefficient for loop load average; input is # of milliseconds // for exp(-1) decay. -void EventBase::setLoadAvgMsec(uint32_t ms) { +void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) { assert(enableTimeMeasurement_); - uint64_t us = 1000 * ms; - if (ms > 0) { + std::chrono::microseconds us = std::chrono::milliseconds(ms); + if (ms > std::chrono::milliseconds::zero()) { maxLatencyLoopTime_.setTimeInterval(us); avgLoopTime_.setTimeInterval(us); } else { @@ -303,6 +247,19 @@ bool EventBase::loopOnce(int flags) { bool EventBase::loopBody(int flags) { VLOG(5) << "EventBase(): Starting loop."; + + DCHECK(!invokingLoop_) + << "Your code just tried to loop over an event base from inside another " + << "event base loop. Since libevent is not reentrant, this leads to " + << "undefined behavior in opt builds. Please fix immediately. For the " + << "common case of an inner function that needs to do some synchronous " + << "computation on an event-base, replace getEventBase() by a new, " + << "stack-allocated EvenBase."; + invokingLoop_ = true; + SCOPE_EXIT { + invokingLoop_ = false; + }; + int res = 0; bool ranLoopCallbacks; bool blocking = !(flags & EVLOOP_NONBLOCK); @@ -356,8 +313,10 @@ bool EventBase::loopBody(int flags) { startWork_; idle = startWork_ - idleStart; - avgLoopTime_.addSample(idle, busy); - maxLatencyLoopTime_.addSample(idle, busy); + avgLoopTime_.addSample(std::chrono::microseconds(idle), + std::chrono::microseconds(busy)); + maxLatencyLoopTime_.addSample(std::chrono::microseconds(idle), + std::chrono::microseconds(busy)); if (observer_) { if (observerSampleCount_++ == observer_->getSampleRate()) { @@ -434,13 +393,22 @@ bool EventBase::loopBody(int flags) { return true; } +ssize_t EventBase::loopKeepAliveCount() { + if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) { + loopKeepAliveCount_ += + loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed); + } + DCHECK_GE(loopKeepAliveCount_, 0); + return loopKeepAliveCount_; +} + void EventBase::applyLoopKeepAlive() { - if (loopKeepAliveActive_ && loopKeepAlive_.unique()) { + if (loopKeepAliveActive_ && loopKeepAliveCount() == 0) { // Restore the notification queue internal flag fnRunner_->stopConsuming(); fnRunner_->startConsumingInternal(this, queue_.get()); loopKeepAliveActive_ = false; - } else if (!loopKeepAliveActive_ && !loopKeepAlive_.unique()) { + } else if (!loopKeepAliveActive_ && loopKeepAliveCount() > 0) { // Update the notification queue event to treat it as a normal // (non-internal) event. The notification queue event always remains // installed, and the main loop won't exit with it installed. @@ -455,11 +423,14 @@ void EventBase::loopForever() { { SCOPE_EXIT { applyLoopKeepAlive(); - loopForeverActive_ = false; }; - loopForeverActive_ = true; // Make sure notification queue events are treated as normal events. - auto loopKeepAlive = loopKeepAlive_; + // We can't use loopKeepAlive() here since LoopKeepAlive token can only be + // released inside a loop. + ++loopKeepAliveCount_; + SCOPE_EXIT { + --loopKeepAliveCount_; + }; ret = loop(); } @@ -538,13 +509,6 @@ void EventBase::runInLoop(Func cob, bool thisIteration) { } } -void EventBase::runAfterDrain(Func cob) { - auto callback = new FunctionLoopCallback(std::move(cob)); - std::lock_guard lg(runAfterDrainCallbacksMutex_); - callback->cancelLoopCallback(); - runAfterDrainCallbacks_.push_back(*callback); -} - void EventBase::runOnDestruction(LoopCallback* callback) { std::lock_guard lg(onDestructionCallbacksMutex_); callback->cancelLoopCallback(); @@ -586,7 +550,7 @@ bool EventBase::runInEventBaseThread(Func fn) { return true; } -bool EventBase::runInEventBaseThreadAndWait(Func fn) { +bool EventBase::runInEventBaseThreadAndWait(FuncRef fn) { if (inRunningEventBaseThread()) { LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not " << "allowed"; @@ -613,7 +577,7 @@ bool EventBase::runInEventBaseThreadAndWait(Func fn) { return true; } -bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) { +bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn) { if (isInEventBaseThread()) { fn(); return true; @@ -622,30 +586,7 @@ bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) { } } -void EventBase::runAfterDelay( - Func cob, - uint32_t milliseconds, - TimeoutManager::InternalEnum in) { - if (!tryRunAfterDelay(std::move(cob), milliseconds, in)) { - folly::throwSystemError( - "error in EventBase::runAfterDelay(), failed to schedule timeout"); - } -} - -bool EventBase::tryRunAfterDelay( - Func cob, - uint32_t milliseconds, - TimeoutManager::InternalEnum in) { - CobTimeout* timeout = new CobTimeout(this, std::move(cob), in); - if (!timeout->scheduleTimeout(milliseconds)) { - delete timeout; - return false; - } - pendingCobTimeouts_.push_back(*timeout); - return true; -} - -bool EventBase::runLoopCallbacks(bool setContext) { +bool EventBase::runLoopCallbacks() { if (!loopCallbacks_.empty()) { bumpHandlingTime(); // Swap the loopCallbacks_ list with a temporary list on our stack. @@ -663,9 +604,7 @@ bool EventBase::runLoopCallbacks(bool setContext) { while (!currentCallbacks.empty()) { LoopCallback* callback = ¤tCallbacks.front(); currentCallbacks.pop_front(); - if (setContext) { - RequestContext::setContext(callback->context_); - } + folly::RequestContextScopeGuard rctx(callback->context_); callback->runLoopCallback(); } @@ -696,8 +635,9 @@ void EventBase::initNotificationQueue() { fnRunner_->startConsumingInternal(this, queue_.get()); } -void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) { - expCoeff_ = -1.0/timeInterval; +void EventBase::SmoothLoopTime::setTimeInterval( + std::chrono::microseconds timeInterval) { + expCoeff_ = -1.0 / timeInterval.count(); VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__; } @@ -705,29 +645,32 @@ void EventBase::SmoothLoopTime::reset(double value) { value_ = value; } -void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) { - /* - * Position at which the busy sample is considered to be taken. - * (Allows to quickly skew our average without editing much code) - */ - enum BusySamplePosition { - RIGHT = 0, // busy sample placed at the end of the iteration - CENTER = 1, // busy sample placed at the middle point of the iteration - LEFT = 2, // busy sample placed at the beginning of the iteration - }; +void EventBase::SmoothLoopTime::addSample( + std::chrono::microseconds idle, + std::chrono::microseconds busy) { + /* + * Position at which the busy sample is considered to be taken. + * (Allows to quickly skew our average without editing much code) + */ + enum BusySamplePosition { + RIGHT = 0, // busy sample placed at the end of the iteration + CENTER = 1, // busy sample placed at the middle point of the iteration + LEFT = 2, // busy sample placed at the beginning of the iteration + }; // See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average // and D676020 for more info on this calculation. - VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ << - " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ << - " busy " << busy << " " << __PRETTY_FUNCTION__; + VLOG(11) << "idle " << idle.count() << " oldBusyLeftover_ " + << oldBusyLeftover_.count() << " idle + oldBusyLeftover_ " + << (idle + oldBusyLeftover_).count() << " busy " << busy.count() + << " " << __PRETTY_FUNCTION__; idle += oldBusyLeftover_ + busy; oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2; idle -= oldBusyLeftover_; - double coeff = exp(idle * expCoeff_); + double coeff = exp(idle.count() * expCoeff_); value_ *= coeff; - value_ += (1.0 - coeff) * busy; + value_ += (1.0 - coeff) * busy.count(); } bool EventBase::nothingHandledYet() const noexcept { @@ -759,8 +702,8 @@ bool EventBase::scheduleTimeout(AsyncTimeout* obj, assert(isInEventBaseThread()); // Set up the timeval and add the event struct timeval tv; - tv.tv_sec = timeout.count() / 1000LL; - tv.tv_usec = (timeout.count() % 1000LL) * 1000LL; + tv.tv_sec = long(timeout.count() / 1000LL); + tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL); struct event* ev = obj->getEvent(); if (event_add(ev, &tv) < 0) {