340b2100d3d7a0826a3e43d6b95e63898422cbdd
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/EventBase.h>
22
23 #include <fcntl.h>
24
25 #include <memory>
26 #include <mutex>
27 #include <thread>
28
29 #include <folly/Baton.h>
30 #include <folly/Memory.h>
31 #include <folly/io/async/NotificationQueue.h>
32 #include <folly/io/async/VirtualEventBase.h>
33 #include <folly/portability/Unistd.h>
34 #include <folly/system/ThreadName.h>
35
36 namespace folly {
37
38 /*
39  * EventBase::FunctionRunner
40  */
41
42 class EventBase::FunctionRunner
43     : public NotificationQueue<EventBase::Func>::Consumer {
44  public:
45   void messageAvailable(Func&& msg) noexcept override {
46     // In libevent2, internal events do not break the loop.
47     // Most users would expect loop(), followed by runInEventBaseThread(),
48     // to break the loop and check if it should exit or not.
49     // To have similar bejaviour to libevent1.4, tell the loop to break here.
50     // Note that loop() may still continue to loop, but it will also check the
51     // stop_ flag as well as runInLoop callbacks, etc.
52     event_base_loopbreak(getEventBase()->evb_);
53
54     if (!msg) {
55       // terminateLoopSoon() sends a null message just to
56       // wake up the loop.  We can ignore these messages.
57       return;
58     }
59     msg();
60   }
61 };
62
63 // The interface used to libevent is not thread-safe.  Calls to
64 // event_init() and event_base_free() directly modify an internal
65 // global 'current_base', so a mutex is required to protect this.
66 //
67 // event_init() should only ever be called once.  Subsequent calls
68 // should be made to event_base_new().  We can recognise that
69 // event_init() has already been called by simply inspecting current_base.
70 static std::mutex libevent_mutex_;
71
72 /*
73  * EventBase methods
74  */
75
76 EventBase::EventBase(bool enableTimeMeasurement)
77   : runOnceCallbacks_(nullptr)
78   , stop_(false)
79   , loopThread_()
80   , queue_(nullptr)
81   , fnRunner_(nullptr)
82   , maxLatency_(0)
83   , avgLoopTime_(std::chrono::seconds(2))
84   , maxLatencyLoopTime_(avgLoopTime_)
85   , enableTimeMeasurement_(enableTimeMeasurement)
86   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
87   , latestLoopCnt_(nextLoopCnt_)
88   , startWork_()
89   , observer_(nullptr)
90   , observerSampleCount_(0)
91   , executionObserver_(nullptr) {
92   struct event ev;
93   {
94     std::lock_guard<std::mutex> lock(libevent_mutex_);
95
96     // The value 'current_base' (libevent 1) or
97     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
98     // allowing examination of its value without an explicit reference here.
99     // If ev.ev_base is nullptr, then event_init() must be called, otherwise
100     // call event_base_new().
101     event_set(&ev, 0, 0, nullptr, nullptr);
102     if (!ev.ev_base) {
103       evb_ = event_init();
104     }
105   }
106
107   if (ev.ev_base) {
108     evb_ = event_base_new();
109   }
110
111   if (UNLIKELY(evb_ == nullptr)) {
112     LOG(ERROR) << "EventBase(): Failed to init event base.";
113     folly::throwSystemError("error in EventBase::EventBase()");
114   }
115   VLOG(5) << "EventBase(): Created.";
116   initNotificationQueue();
117 }
118
119 // takes ownership of the event_base
120 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
121   : runOnceCallbacks_(nullptr)
122   , stop_(false)
123   , loopThread_()
124   , evb_(evb)
125   , queue_(nullptr)
126   , fnRunner_(nullptr)
127   , maxLatency_(0)
128   , avgLoopTime_(std::chrono::seconds(2))
129   , maxLatencyLoopTime_(avgLoopTime_)
130   , enableTimeMeasurement_(enableTimeMeasurement)
131   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
132   , latestLoopCnt_(nextLoopCnt_)
133   , startWork_()
134   , observer_(nullptr)
135   , observerSampleCount_(0)
136   , executionObserver_(nullptr) {
137   if (UNLIKELY(evb_ == nullptr)) {
138     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
139     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
140   }
141   initNotificationQueue();
142 }
143
144 EventBase::~EventBase() {
145   std::future<void> virtualEventBaseDestroyFuture;
146   if (virtualEventBase_) {
147     virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
148   }
149
150   // Keep looping until all keep-alive handles are released. Each keep-alive
151   // handle signals that some external code will still schedule some work on
152   // this EventBase (so it's not safe to destroy it).
153   while (loopKeepAliveCount() > 0) {
154     applyLoopKeepAlive();
155     loopOnce();
156   }
157
158   if (virtualEventBaseDestroyFuture.valid()) {
159     virtualEventBaseDestroyFuture.get();
160   }
161
162   // Call all destruction callbacks, before we start cleaning up our state.
163   while (!onDestructionCallbacks_.empty()) {
164     LoopCallback* callback = &onDestructionCallbacks_.front();
165     onDestructionCallbacks_.pop_front();
166     callback->runLoopCallback();
167   }
168
169   clearCobTimeouts();
170
171   DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
172
173   (void)runLoopCallbacks();
174
175   if (!fnRunner_->consumeUntilDrained()) {
176     LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
177   }
178
179   // Stop consumer before deleting NotificationQueue
180   fnRunner_->stopConsuming();
181   {
182     std::lock_guard<std::mutex> lock(libevent_mutex_);
183     event_base_free(evb_);
184   }
185
186   for (auto storage : localStorageToDtor_) {
187     storage->onEventBaseDestruction(*this);
188   }
189
190   VLOG(5) << "EventBase(): Destroyed.";
191 }
192
193 size_t EventBase::getNotificationQueueSize() const {
194   return queue_->size();
195 }
196
197 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
198   fnRunner_->setMaxReadAtOnce(maxAtOnce);
199 }
200
201 void EventBase::checkIsInEventBaseThread() const {
202   auto evbTid = loopThread_.load(std::memory_order_relaxed);
203   if (evbTid == std::thread::id()) {
204     return;
205   }
206
207   // Using getThreadName(evbTid) instead of name_ will work also if
208   // the thread name is set outside of EventBase (and name_ is empty).
209   auto curTid = std::this_thread::get_id();
210   CHECK(evbTid == curTid)
211       << "This logic must be executed in the event base thread. "
212       << "Event base thread name: \""
213       << folly::getThreadName(evbTid).value_or("")
214       << "\", current thread name: \""
215       << folly::getThreadName(curTid).value_or("") << "\"";
216 }
217
218 // Set smoothing coefficient for loop load average; input is # of milliseconds
219 // for exp(-1) decay.
220 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
221   assert(enableTimeMeasurement_);
222   std::chrono::microseconds us = std::chrono::milliseconds(ms);
223   if (ms > std::chrono::milliseconds::zero()) {
224     maxLatencyLoopTime_.setTimeInterval(us);
225     avgLoopTime_.setTimeInterval(us);
226   } else {
227     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
228   }
229 }
230
231 void EventBase::resetLoadAvg(double value) {
232   assert(enableTimeMeasurement_);
233   avgLoopTime_.reset(value);
234   maxLatencyLoopTime_.reset(value);
235 }
236
237 static std::chrono::milliseconds
238 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
239   auto result = std::chrono::steady_clock::now() - *prev;
240   *prev = std::chrono::steady_clock::now();
241
242   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
243 }
244
245 void EventBase::waitUntilRunning() {
246   while (!isRunning()) {
247     std::this_thread::yield();
248   }
249 }
250
251 // enters the event_base loop -- will only exit when forced to
252 bool EventBase::loop() {
253   return loopBody();
254 }
255
256 bool EventBase::loopOnce(int flags) {
257   return loopBody(flags | EVLOOP_ONCE);
258 }
259
260 bool EventBase::loopBody(int flags) {
261   VLOG(5) << "EventBase(): Starting loop.";
262
263   DCHECK(!invokingLoop_)
264       << "Your code just tried to loop over an event base from inside another "
265       << "event base loop. Since libevent is not reentrant, this leads to "
266       << "undefined behavior in opt builds. Please fix immediately. For the "
267       << "common case of an inner function that needs to do some synchronous "
268       << "computation on an event-base, replace getEventBase() by a new, "
269       << "stack-allocated EvenBase.";
270   invokingLoop_ = true;
271   SCOPE_EXIT {
272     invokingLoop_ = false;
273   };
274
275   int res = 0;
276   bool ranLoopCallbacks;
277   bool blocking = !(flags & EVLOOP_NONBLOCK);
278   bool once = (flags & EVLOOP_ONCE);
279
280   // time-measurement variables.
281   std::chrono::steady_clock::time_point prev;
282   std::chrono::steady_clock::time_point idleStart = {};
283   std::chrono::microseconds busy;
284   std::chrono::microseconds idle;
285
286   loopThread_.store(std::this_thread::get_id(), std::memory_order_release);
287
288   if (!name_.empty()) {
289     setThreadName(name_);
290   }
291
292   if (enableTimeMeasurement_) {
293     prev = std::chrono::steady_clock::now();
294     idleStart = std::chrono::steady_clock::now();
295   }
296
297   while (!stop_.load(std::memory_order_acquire)) {
298     applyLoopKeepAlive();
299     ++nextLoopCnt_;
300
301     // Run the before loop callbacks
302     LoopCallbackList callbacks;
303     callbacks.swap(runBeforeLoopCallbacks_);
304
305     while(!callbacks.empty()) {
306       auto* item = &callbacks.front();
307       callbacks.pop_front();
308       item->runLoopCallback();
309     }
310
311     // nobody can add loop callbacks from within this thread if
312     // we don't have to handle anything to start with...
313     if (blocking && loopCallbacks_.empty()) {
314       res = event_base_loop(evb_, EVLOOP_ONCE);
315     } else {
316       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
317     }
318
319     ranLoopCallbacks = runLoopCallbacks();
320
321     if (enableTimeMeasurement_) {
322       busy = std::chrono::duration_cast<std::chrono::microseconds>(
323           std::chrono::steady_clock::now() - startWork_);
324       idle = std::chrono::duration_cast<std::chrono::microseconds>(
325           startWork_ - idleStart);
326
327       avgLoopTime_.addSample(std::chrono::microseconds(idle),
328         std::chrono::microseconds(busy));
329       maxLatencyLoopTime_.addSample(std::chrono::microseconds(idle),
330         std::chrono::microseconds(busy));
331
332       if (observer_) {
333         if (observerSampleCount_++ == observer_->getSampleRate()) {
334           observerSampleCount_ = 0;
335           observer_->loopSample(busy.count(), idle.count());
336         }
337       }
338
339       VLOG(11) << "EventBase "  << this         << " did not timeout " <<
340         " loop time guess: "    << (busy + idle).count()  <<
341         " idle time: "          << idle.count()         <<
342         " busy time: "          << busy.count()         <<
343         " avgLoopTime: "        << avgLoopTime_.get() <<
344         " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
345         " maxLatency_: "        << maxLatency_.count() << "us" <<
346         " notificationQueueSize: " << getNotificationQueueSize() <<
347         " nothingHandledYet(): " << nothingHandledYet();
348
349       // see if our average loop time has exceeded our limit
350       if ((maxLatency_ > std::chrono::microseconds::zero()) &&
351           (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
352         maxLatencyCob_();
353         // back off temporarily -- don't keep spamming maxLatencyCob_
354         // if we're only a bit over the limit
355         maxLatencyLoopTime_.dampen(0.9);
356       }
357
358       // Our loop run did real work; reset the idle timer
359       idleStart = std::chrono::steady_clock::now();
360     } else {
361       VLOG(11) << "EventBase " << this << " did not timeout";
362     }
363
364     // If the event loop indicate that there were no more events, and
365     // we also didn't have any loop callbacks to run, there is nothing left to
366     // do.
367     if (res != 0 && !ranLoopCallbacks) {
368       // Since Notification Queue is marked 'internal' some events may not have
369       // run.  Run them manually if so, and continue looping.
370       //
371       if (getNotificationQueueSize() > 0) {
372         fnRunner_->handlerReady(0);
373       } else {
374         break;
375       }
376     }
377
378     if (enableTimeMeasurement_) {
379       VLOG(11) << "EventBase " << this << " loop time: " <<
380         getTimeDelta(&prev).count();
381     }
382
383     if (once) {
384       break;
385     }
386   }
387   // Reset stop_ so loop() can be called again
388   stop_ = false;
389
390   if (res < 0) {
391     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
392     return false;
393   } else if (res == 1) {
394     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
395   } else if (res > 1) {
396     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
397     return false;
398   }
399
400   loopThread_.store({}, std::memory_order_release);
401
402   VLOG(5) << "EventBase(): Done with loop.";
403   return true;
404 }
405
406 ssize_t EventBase::loopKeepAliveCount() {
407   if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
408     loopKeepAliveCount_ +=
409         loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
410   }
411   DCHECK_GE(loopKeepAliveCount_, 0);
412
413   return loopKeepAliveCount_;
414 }
415
416 void EventBase::applyLoopKeepAlive() {
417   auto keepAliveCount = loopKeepAliveCount();
418   // Make sure default VirtualEventBase won't hold EventBase::loop() forever.
419   if (virtualEventBase_ && virtualEventBase_->keepAliveCount() == 1) {
420     --keepAliveCount;
421   }
422
423   if (loopKeepAliveActive_ && keepAliveCount == 0) {
424     // Restore the notification queue internal flag
425     fnRunner_->stopConsuming();
426     fnRunner_->startConsumingInternal(this, queue_.get());
427     loopKeepAliveActive_ = false;
428   } else if (!loopKeepAliveActive_ && keepAliveCount > 0) {
429     // Update the notification queue event to treat it as a normal
430     // (non-internal) event.  The notification queue event always remains
431     // installed, and the main loop won't exit with it installed.
432     fnRunner_->stopConsuming();
433     fnRunner_->startConsuming(this, queue_.get());
434     loopKeepAliveActive_ = true;
435   }
436 }
437
438 void EventBase::loopForever() {
439   bool ret;
440   {
441     SCOPE_EXIT {
442       applyLoopKeepAlive();
443     };
444     // Make sure notification queue events are treated as normal events.
445     // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
446     // released inside a loop.
447     ++loopKeepAliveCount_;
448     SCOPE_EXIT {
449       --loopKeepAliveCount_;
450     };
451     ret = loop();
452   }
453
454   if (!ret) {
455     folly::throwSystemError("error in EventBase::loopForever()");
456   }
457 }
458
459 void EventBase::bumpHandlingTime() {
460   if (!enableTimeMeasurement_) {
461     return;
462   }
463
464   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
465     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
466   if (nothingHandledYet()) {
467     latestLoopCnt_ = nextLoopCnt_;
468     // set the time
469     startWork_ = std::chrono::steady_clock::now();
470
471     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
472              << " (loop) startWork_ " << startWork_.time_since_epoch().count();
473   }
474 }
475
476 void EventBase::terminateLoopSoon() {
477   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
478
479   // Set stop to true, so the event loop will know to exit.
480   // TODO: We should really use an atomic operation here with a release
481   // barrier.
482   stop_ = true;
483
484   // Call event_base_loopbreak() so that libevent will exit the next time
485   // around the loop.
486   event_base_loopbreak(evb_);
487
488   // If terminateLoopSoon() is called from another thread,
489   // the EventBase thread might be stuck waiting for events.
490   // In this case, it won't wake up and notice that stop_ is set until it
491   // receives another event.  Send an empty frame to the notification queue
492   // so that the event loop will wake up even if there are no other events.
493   //
494   // We don't care about the return value of trySendFrame().  If it fails
495   // this likely means the EventBase already has lots of events waiting
496   // anyway.
497   try {
498     queue_->putMessage(nullptr);
499   } catch (...) {
500     // We don't care if putMessage() fails.  This likely means
501     // the EventBase already has lots of events waiting anyway.
502   }
503 }
504
505 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
506   dcheckIsInEventBaseThread();
507   callback->cancelLoopCallback();
508   callback->context_ = RequestContext::saveContext();
509   if (runOnceCallbacks_ != nullptr && thisIteration) {
510     runOnceCallbacks_->push_back(*callback);
511   } else {
512     loopCallbacks_.push_back(*callback);
513   }
514 }
515
516 void EventBase::runInLoop(Func cob, bool thisIteration) {
517   dcheckIsInEventBaseThread();
518   auto wrapper = new FunctionLoopCallback(std::move(cob));
519   wrapper->context_ = RequestContext::saveContext();
520   if (runOnceCallbacks_ != nullptr && thisIteration) {
521     runOnceCallbacks_->push_back(*wrapper);
522   } else {
523     loopCallbacks_.push_back(*wrapper);
524   }
525 }
526
527 void EventBase::runOnDestruction(LoopCallback* callback) {
528   std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
529   callback->cancelLoopCallback();
530   onDestructionCallbacks_.push_back(*callback);
531 }
532
533 void EventBase::runBeforeLoop(LoopCallback* callback) {
534   dcheckIsInEventBaseThread();
535   callback->cancelLoopCallback();
536   runBeforeLoopCallbacks_.push_back(*callback);
537 }
538
539 bool EventBase::runInEventBaseThread(Func fn) {
540   // Send the message.
541   // It will be received by the FunctionRunner in the EventBase's thread.
542
543   // We try not to schedule nullptr callbacks
544   if (!fn) {
545     LOG(ERROR) << "EventBase " << this
546                << ": Scheduling nullptr callbacks is not allowed";
547     return false;
548   }
549
550   // Short-circuit if we are already in our event base
551   if (inRunningEventBaseThread()) {
552     runInLoop(std::move(fn));
553     return true;
554   }
555
556   try {
557     queue_->putMessage(std::move(fn));
558   } catch (const std::exception& ex) {
559     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
560                << "for EventBase thread: " << ex.what();
561     return false;
562   }
563
564   return true;
565 }
566
567 bool EventBase::runInEventBaseThreadAndWait(Func fn) {
568   if (inRunningEventBaseThread()) {
569     LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
570                << "allowed";
571     return false;
572   }
573
574   Baton<> ready;
575   runInEventBaseThread([&ready, fn = std::move(fn)]() mutable {
576     SCOPE_EXIT {
577       ready.post();
578     };
579     // A trick to force the stored functor to be executed and then destructed
580     // before posting the baton and waking the waiting thread.
581     copy(std::move(fn))();
582   });
583   ready.wait();
584
585   return true;
586 }
587
588 bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) {
589   if (isInEventBaseThread()) {
590     fn();
591     return true;
592   } else {
593     return runInEventBaseThreadAndWait(std::move(fn));
594   }
595 }
596
597 bool EventBase::runLoopCallbacks() {
598   if (!loopCallbacks_.empty()) {
599     bumpHandlingTime();
600     // Swap the loopCallbacks_ list with a temporary list on our stack.
601     // This way we will only run callbacks scheduled at the time
602     // runLoopCallbacks() was invoked.
603     //
604     // If any of these callbacks in turn call runInLoop() to schedule more
605     // callbacks, those new callbacks won't be run until the next iteration
606     // around the event loop.  This prevents runInLoop() callbacks from being
607     // able to start file descriptor and timeout based events.
608     LoopCallbackList currentCallbacks;
609     currentCallbacks.swap(loopCallbacks_);
610     runOnceCallbacks_ = &currentCallbacks;
611
612     while (!currentCallbacks.empty()) {
613       LoopCallback* callback = &currentCallbacks.front();
614       currentCallbacks.pop_front();
615       folly::RequestContextScopeGuard rctx(callback->context_);
616       callback->runLoopCallback();
617     }
618
619     runOnceCallbacks_ = nullptr;
620     return true;
621   }
622   return false;
623 }
624
625 void EventBase::initNotificationQueue() {
626   // Infinite size queue
627   queue_ = std::make_unique<NotificationQueue<Func>>();
628
629   // We allocate fnRunner_ separately, rather than declaring it directly
630   // as a member of EventBase solely so that we don't need to include
631   // NotificationQueue.h from EventBase.h
632   fnRunner_ = std::make_unique<FunctionRunner>();
633
634   // Mark this as an internal event, so event_base_loop() will return if
635   // there are no other events besides this one installed.
636   //
637   // Most callers don't care about the internal notification queue used by
638   // EventBase.  The queue is always installed, so if we did count the queue as
639   // an active event, loop() would never exit with no more events to process.
640   // Users can use loopForever() if they do care about the notification queue.
641   // (This is useful for EventBase threads that do nothing but process
642   // runInEventBaseThread() notifications.)
643   fnRunner_->startConsumingInternal(this, queue_.get());
644 }
645
646 void EventBase::SmoothLoopTime::setTimeInterval(
647     std::chrono::microseconds timeInterval) {
648   expCoeff_ = -1.0 / timeInterval.count();
649   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
650 }
651
652 void EventBase::SmoothLoopTime::reset(double value) {
653   value_ = value;
654 }
655
656 void EventBase::SmoothLoopTime::addSample(
657     std::chrono::microseconds idle,
658     std::chrono::microseconds busy) {
659   /*
660    * Position at which the busy sample is considered to be taken.
661    * (Allows to quickly skew our average without editing much code)
662    */
663   enum BusySamplePosition {
664     RIGHT = 0, // busy sample placed at the end of the iteration
665     CENTER = 1, // busy sample placed at the middle point of the iteration
666     LEFT = 2, // busy sample placed at the beginning of the iteration
667   };
668
669   // See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
670   // and D676020 for more info on this calculation.
671   VLOG(11) << "idle " << idle.count() << " oldBusyLeftover_ "
672            << oldBusyLeftover_.count() << " idle + oldBusyLeftover_ "
673            << (idle + oldBusyLeftover_).count() << " busy " << busy.count()
674            << " " << __PRETTY_FUNCTION__;
675   idle += oldBusyLeftover_ + busy;
676   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
677   idle -= oldBusyLeftover_;
678
679   double coeff = exp(idle.count() * expCoeff_);
680   value_ *= coeff;
681   value_ += (1.0 - coeff) * busy.count();
682 }
683
684 bool EventBase::nothingHandledYet() const noexcept {
685   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
686   return (nextLoopCnt_ != latestLoopCnt_);
687 }
688
689 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
690                                       InternalEnum internal) {
691
692   struct event* ev = obj->getEvent();
693   assert(ev->ev_base == nullptr);
694
695   event_base_set(getLibeventBase(), ev);
696   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
697     // Set the EVLIST_INTERNAL flag
698     event_ref_flags(ev) |= EVLIST_INTERNAL;
699   }
700 }
701
702 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
703   cancelTimeout(obj);
704   struct event* ev = obj->getEvent();
705   ev->ev_base = nullptr;
706 }
707
708 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
709                                  TimeoutManager::timeout_type timeout) {
710   dcheckIsInEventBaseThread();
711   // Set up the timeval and add the event
712   struct timeval tv;
713   tv.tv_sec = long(timeout.count() / 1000LL);
714   tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
715
716   struct event* ev = obj->getEvent();
717   if (event_add(ev, &tv) < 0) {
718     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
719     return false;
720   }
721
722   return true;
723 }
724
725 void EventBase::cancelTimeout(AsyncTimeout* obj) {
726   dcheckIsInEventBaseThread();
727   struct event* ev = obj->getEvent();
728   if (EventUtil::isEventRegistered(ev)) {
729     event_del(ev);
730   }
731 }
732
733 void EventBase::setName(const std::string& name) {
734   dcheckIsInEventBaseThread();
735   name_ = name;
736
737   if (isRunning()) {
738     setThreadName(loopThread_.load(std::memory_order_relaxed),
739                   name_);
740   }
741 }
742
743 const std::string& EventBase::getName() {
744   dcheckIsInEventBaseThread();
745   return name_;
746 }
747
748 const char* EventBase::getLibeventVersion() { return event_get_version(); }
749 const char* EventBase::getLibeventMethod() { return event_get_method(); }
750
751 VirtualEventBase& EventBase::getVirtualEventBase() {
752   folly::call_once(virtualEventBaseInitFlag_, [&] {
753     virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
754   });
755
756   return *virtualEventBase_;
757 }
758
759 } // namespace folly