Print expected/actual thread names when running EventBase logic in unexpected thread
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/VirtualEventBase.h>
23
24 #include <folly/Memory.h>
25 #include <folly/ThreadName.h>
26 #include <folly/io/async/NotificationQueue.h>
27 #include <folly/portability/Unistd.h>
28
29 #include <condition_variable>
30 #include <fcntl.h>
31 #include <mutex>
32 #include <thread>
33
34 namespace folly {
35
36 /*
37  * EventBase::FunctionRunner
38  */
39
40 class EventBase::FunctionRunner
41     : public NotificationQueue<EventBase::Func>::Consumer {
42  public:
43   void messageAvailable(Func&& msg) noexcept override {
44     // In libevent2, internal events do not break the loop.
45     // Most users would expect loop(), followed by runInEventBaseThread(),
46     // to break the loop and check if it should exit or not.
47     // To have similar bejaviour to libevent1.4, tell the loop to break here.
48     // Note that loop() may still continue to loop, but it will also check the
49     // stop_ flag as well as runInLoop callbacks, etc.
50     event_base_loopbreak(getEventBase()->evb_);
51
52     if (!msg) {
53       // terminateLoopSoon() sends a null message just to
54       // wake up the loop.  We can ignore these messages.
55       return;
56     }
57     msg();
58   }
59 };
60
61 // The interface used to libevent is not thread-safe.  Calls to
62 // event_init() and event_base_free() directly modify an internal
63 // global 'current_base', so a mutex is required to protect this.
64 //
65 // event_init() should only ever be called once.  Subsequent calls
66 // should be made to event_base_new().  We can recognise that
67 // event_init() has already been called by simply inspecting current_base.
68 static std::mutex libevent_mutex_;
69
70 /*
71  * EventBase methods
72  */
73
74 EventBase::EventBase(bool enableTimeMeasurement)
75   : runOnceCallbacks_(nullptr)
76   , stop_(false)
77   , loopThread_()
78   , queue_(nullptr)
79   , fnRunner_(nullptr)
80   , maxLatency_(0)
81   , avgLoopTime_(std::chrono::seconds(2))
82   , maxLatencyLoopTime_(avgLoopTime_)
83   , enableTimeMeasurement_(enableTimeMeasurement)
84   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
85   , latestLoopCnt_(nextLoopCnt_)
86   , startWork_()
87   , observer_(nullptr)
88   , observerSampleCount_(0)
89   , executionObserver_(nullptr) {
90   struct event ev;
91   {
92     std::lock_guard<std::mutex> lock(libevent_mutex_);
93
94     // The value 'current_base' (libevent 1) or
95     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
96     // allowing examination of its value without an explicit reference here.
97     // If ev.ev_base is nullptr, then event_init() must be called, otherwise
98     // call event_base_new().
99     event_set(&ev, 0, 0, nullptr, nullptr);
100     if (!ev.ev_base) {
101       evb_ = event_init();
102     }
103   }
104
105   if (ev.ev_base) {
106     evb_ = event_base_new();
107   }
108
109   if (UNLIKELY(evb_ == nullptr)) {
110     LOG(ERROR) << "EventBase(): Failed to init event base.";
111     folly::throwSystemError("error in EventBase::EventBase()");
112   }
113   VLOG(5) << "EventBase(): Created.";
114   initNotificationQueue();
115   RequestContext::saveContext();
116 }
117
118 // takes ownership of the event_base
119 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
120   : runOnceCallbacks_(nullptr)
121   , stop_(false)
122   , loopThread_()
123   , evb_(evb)
124   , queue_(nullptr)
125   , fnRunner_(nullptr)
126   , maxLatency_(0)
127   , avgLoopTime_(std::chrono::seconds(2))
128   , maxLatencyLoopTime_(avgLoopTime_)
129   , enableTimeMeasurement_(enableTimeMeasurement)
130   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
131   , latestLoopCnt_(nextLoopCnt_)
132   , startWork_()
133   , observer_(nullptr)
134   , observerSampleCount_(0)
135   , executionObserver_(nullptr) {
136   if (UNLIKELY(evb_ == nullptr)) {
137     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
138     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
139   }
140   initNotificationQueue();
141   RequestContext::saveContext();
142 }
143
144 EventBase::~EventBase() {
145   std::future<void> virtualEventBaseDestroyFuture;
146   if (virtualEventBase_) {
147     virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
148   }
149
150   // Keep looping until all keep-alive handles are released. Each keep-alive
151   // handle signals that some external code will still schedule some work on
152   // this EventBase (so it's not safe to destroy it).
153   while (loopKeepAliveCount() > 0) {
154     applyLoopKeepAlive();
155     loopOnce();
156   }
157
158   if (virtualEventBaseDestroyFuture.valid()) {
159     virtualEventBaseDestroyFuture.get();
160   }
161
162   // Call all destruction callbacks, before we start cleaning up our state.
163   while (!onDestructionCallbacks_.empty()) {
164     LoopCallback* callback = &onDestructionCallbacks_.front();
165     onDestructionCallbacks_.pop_front();
166     callback->runLoopCallback();
167   }
168
169   clearCobTimeouts();
170
171   DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
172
173   (void)runLoopCallbacks();
174
175   if (!fnRunner_->consumeUntilDrained()) {
176     LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
177   }
178
179   // Stop consumer before deleting NotificationQueue
180   fnRunner_->stopConsuming();
181   {
182     std::lock_guard<std::mutex> lock(libevent_mutex_);
183     event_base_free(evb_);
184   }
185
186   for (auto storage : localStorageToDtor_) {
187     storage->onEventBaseDestruction(*this);
188   }
189
190   VLOG(5) << "EventBase(): Destroyed.";
191 }
192
193 size_t EventBase::getNotificationQueueSize() const {
194   return queue_->size();
195 }
196
197 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
198   fnRunner_->setMaxReadAtOnce(maxAtOnce);
199 }
200
201 void EventBase::checkIsInEventBaseThread() const {
202   auto evbTid = loopThread_.load(std::memory_order_relaxed);
203   if (evbTid == std::thread::id()) {
204     return;
205   }
206
207   // Using getThreadName(evbTid) instead of name_ will work also if
208   // the thread name is set outside of EventBase (and name_ is empty).
209   auto curTid = std::this_thread::get_id();
210   CHECK(evbTid == curTid)
211       << "This logic must be executed in the event base thread. "
212       << "Event base thread name: \""
213       << folly::getThreadName(evbTid).value_or("")
214       << "\", current thread name: \""
215       << folly::getThreadName(curTid).value_or("") << "\"";
216 }
217
218 // Set smoothing coefficient for loop load average; input is # of milliseconds
219 // for exp(-1) decay.
220 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
221   assert(enableTimeMeasurement_);
222   std::chrono::microseconds us = std::chrono::milliseconds(ms);
223   if (ms > std::chrono::milliseconds::zero()) {
224     maxLatencyLoopTime_.setTimeInterval(us);
225     avgLoopTime_.setTimeInterval(us);
226   } else {
227     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
228   }
229 }
230
231 void EventBase::resetLoadAvg(double value) {
232   assert(enableTimeMeasurement_);
233   avgLoopTime_.reset(value);
234   maxLatencyLoopTime_.reset(value);
235 }
236
237 static std::chrono::milliseconds
238 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
239   auto result = std::chrono::steady_clock::now() - *prev;
240   *prev = std::chrono::steady_clock::now();
241
242   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
243 }
244
245 void EventBase::waitUntilRunning() {
246   while (!isRunning()) {
247     std::this_thread::yield();
248   }
249 }
250
251 // enters the event_base loop -- will only exit when forced to
252 bool EventBase::loop() {
253   return loopBody();
254 }
255
256 bool EventBase::loopOnce(int flags) {
257   return loopBody(flags | EVLOOP_ONCE);
258 }
259
260 bool EventBase::loopBody(int flags) {
261   VLOG(5) << "EventBase(): Starting loop.";
262
263   DCHECK(!invokingLoop_)
264       << "Your code just tried to loop over an event base from inside another "
265       << "event base loop. Since libevent is not reentrant, this leads to "
266       << "undefined behavior in opt builds. Please fix immediately. For the "
267       << "common case of an inner function that needs to do some synchronous "
268       << "computation on an event-base, replace getEventBase() by a new, "
269       << "stack-allocated EvenBase.";
270   invokingLoop_ = true;
271   SCOPE_EXIT {
272     invokingLoop_ = false;
273   };
274
275   int res = 0;
276   bool ranLoopCallbacks;
277   bool blocking = !(flags & EVLOOP_NONBLOCK);
278   bool once = (flags & EVLOOP_ONCE);
279
280   // time-measurement variables.
281   std::chrono::steady_clock::time_point prev;
282   std::chrono::steady_clock::time_point idleStart = {};
283   std::chrono::microseconds busy;
284   std::chrono::microseconds idle;
285
286   loopThread_.store(std::this_thread::get_id(), std::memory_order_release);
287
288   if (!name_.empty()) {
289     setThreadName(name_);
290   }
291
292   if (enableTimeMeasurement_) {
293     prev = std::chrono::steady_clock::now();
294     idleStart = std::chrono::steady_clock::now();
295   }
296
297   while (!stop_.load(std::memory_order_acquire)) {
298     applyLoopKeepAlive();
299     ++nextLoopCnt_;
300
301     // Run the before loop callbacks
302     LoopCallbackList callbacks;
303     callbacks.swap(runBeforeLoopCallbacks_);
304
305     while(!callbacks.empty()) {
306       auto* item = &callbacks.front();
307       callbacks.pop_front();
308       item->runLoopCallback();
309     }
310
311     // nobody can add loop callbacks from within this thread if
312     // we don't have to handle anything to start with...
313     if (blocking && loopCallbacks_.empty()) {
314       res = event_base_loop(evb_, EVLOOP_ONCE);
315     } else {
316       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
317     }
318
319     ranLoopCallbacks = runLoopCallbacks();
320
321     if (enableTimeMeasurement_) {
322       busy = std::chrono::duration_cast<std::chrono::microseconds>(
323           std::chrono::steady_clock::now() - startWork_);
324       idle = std::chrono::duration_cast<std::chrono::microseconds>(
325           startWork_ - idleStart);
326
327       avgLoopTime_.addSample(std::chrono::microseconds(idle),
328         std::chrono::microseconds(busy));
329       maxLatencyLoopTime_.addSample(std::chrono::microseconds(idle),
330         std::chrono::microseconds(busy));
331
332       if (observer_) {
333         if (observerSampleCount_++ == observer_->getSampleRate()) {
334           observerSampleCount_ = 0;
335           observer_->loopSample(busy.count(), idle.count());
336         }
337       }
338
339       VLOG(11) << "EventBase "  << this         << " did not timeout " <<
340         " loop time guess: "    << (busy + idle).count()  <<
341         " idle time: "          << idle.count()         <<
342         " busy time: "          << busy.count()         <<
343         " avgLoopTime: "        << avgLoopTime_.get() <<
344         " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
345         " maxLatency_: "        << maxLatency_.count() << "us" <<
346         " notificationQueueSize: " << getNotificationQueueSize() <<
347         " nothingHandledYet(): " << nothingHandledYet();
348
349       // see if our average loop time has exceeded our limit
350       if ((maxLatency_ > std::chrono::microseconds::zero()) &&
351           (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
352         maxLatencyCob_();
353         // back off temporarily -- don't keep spamming maxLatencyCob_
354         // if we're only a bit over the limit
355         maxLatencyLoopTime_.dampen(0.9);
356       }
357
358       // Our loop run did real work; reset the idle timer
359       idleStart = std::chrono::steady_clock::now();
360     } else {
361       VLOG(11) << "EventBase " << this << " did not timeout";
362     }
363
364     // If the event loop indicate that there were no more events, and
365     // we also didn't have any loop callbacks to run, there is nothing left to
366     // do.
367     if (res != 0 && !ranLoopCallbacks) {
368       // Since Notification Queue is marked 'internal' some events may not have
369       // run.  Run them manually if so, and continue looping.
370       //
371       if (getNotificationQueueSize() > 0) {
372         fnRunner_->handlerReady(0);
373       } else {
374         break;
375       }
376     }
377
378     if (enableTimeMeasurement_) {
379       VLOG(11) << "EventBase " << this << " loop time: " <<
380         getTimeDelta(&prev).count();
381     }
382
383     if (once) {
384       break;
385     }
386   }
387   // Reset stop_ so loop() can be called again
388   stop_ = false;
389
390   if (res < 0) {
391     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
392     return false;
393   } else if (res == 1) {
394     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
395   } else if (res > 1) {
396     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
397     return false;
398   }
399
400   loopThread_.store({}, std::memory_order_release);
401
402   VLOG(5) << "EventBase(): Done with loop.";
403   return true;
404 }
405
406 ssize_t EventBase::loopKeepAliveCount() {
407   if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
408     loopKeepAliveCount_ +=
409         loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
410   }
411   DCHECK_GE(loopKeepAliveCount_, 0);
412
413   return loopKeepAliveCount_;
414 }
415
416 void EventBase::applyLoopKeepAlive() {
417   auto keepAliveCount = loopKeepAliveCount();
418   // Make sure default VirtualEventBase won't hold EventBase::loop() forever.
419   if (virtualEventBase_ && virtualEventBase_->keepAliveCount() == 1) {
420     --keepAliveCount;
421   }
422
423   if (loopKeepAliveActive_ && keepAliveCount == 0) {
424     // Restore the notification queue internal flag
425     fnRunner_->stopConsuming();
426     fnRunner_->startConsumingInternal(this, queue_.get());
427     loopKeepAliveActive_ = false;
428   } else if (!loopKeepAliveActive_ && keepAliveCount > 0) {
429     // Update the notification queue event to treat it as a normal
430     // (non-internal) event.  The notification queue event always remains
431     // installed, and the main loop won't exit with it installed.
432     fnRunner_->stopConsuming();
433     fnRunner_->startConsuming(this, queue_.get());
434     loopKeepAliveActive_ = true;
435   }
436 }
437
438 void EventBase::loopForever() {
439   bool ret;
440   {
441     SCOPE_EXIT {
442       applyLoopKeepAlive();
443     };
444     // Make sure notification queue events are treated as normal events.
445     // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
446     // released inside a loop.
447     ++loopKeepAliveCount_;
448     SCOPE_EXIT {
449       --loopKeepAliveCount_;
450     };
451     ret = loop();
452   }
453
454   if (!ret) {
455     folly::throwSystemError("error in EventBase::loopForever()");
456   }
457 }
458
459 void EventBase::bumpHandlingTime() {
460   if (!enableTimeMeasurement_) {
461     return;
462   }
463
464   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
465     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
466   if (nothingHandledYet()) {
467     latestLoopCnt_ = nextLoopCnt_;
468     // set the time
469     startWork_ = std::chrono::steady_clock::now();
470
471     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
472              << " (loop) startWork_ " << startWork_.time_since_epoch().count();
473   }
474 }
475
476 void EventBase::terminateLoopSoon() {
477   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
478
479   // Set stop to true, so the event loop will know to exit.
480   // TODO: We should really use an atomic operation here with a release
481   // barrier.
482   stop_ = true;
483
484   // Call event_base_loopbreak() so that libevent will exit the next time
485   // around the loop.
486   event_base_loopbreak(evb_);
487
488   // If terminateLoopSoon() is called from another thread,
489   // the EventBase thread might be stuck waiting for events.
490   // In this case, it won't wake up and notice that stop_ is set until it
491   // receives another event.  Send an empty frame to the notification queue
492   // so that the event loop will wake up even if there are no other events.
493   //
494   // We don't care about the return value of trySendFrame().  If it fails
495   // this likely means the EventBase already has lots of events waiting
496   // anyway.
497   try {
498     queue_->putMessage(nullptr);
499   } catch (...) {
500     // We don't care if putMessage() fails.  This likely means
501     // the EventBase already has lots of events waiting anyway.
502   }
503 }
504
505 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
506   dcheckIsInEventBaseThread();
507   callback->cancelLoopCallback();
508   callback->context_ = RequestContext::saveContext();
509   if (runOnceCallbacks_ != nullptr && thisIteration) {
510     runOnceCallbacks_->push_back(*callback);
511   } else {
512     loopCallbacks_.push_back(*callback);
513   }
514 }
515
516 void EventBase::runInLoop(Func cob, bool thisIteration) {
517   dcheckIsInEventBaseThread();
518   auto wrapper = new FunctionLoopCallback(std::move(cob));
519   wrapper->context_ = RequestContext::saveContext();
520   if (runOnceCallbacks_ != nullptr && thisIteration) {
521     runOnceCallbacks_->push_back(*wrapper);
522   } else {
523     loopCallbacks_.push_back(*wrapper);
524   }
525 }
526
527 void EventBase::runOnDestruction(LoopCallback* callback) {
528   std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
529   callback->cancelLoopCallback();
530   onDestructionCallbacks_.push_back(*callback);
531 }
532
533 void EventBase::runBeforeLoop(LoopCallback* callback) {
534   dcheckIsInEventBaseThread();
535   callback->cancelLoopCallback();
536   runBeforeLoopCallbacks_.push_back(*callback);
537 }
538
539 bool EventBase::runInEventBaseThread(Func fn) {
540   // Send the message.
541   // It will be received by the FunctionRunner in the EventBase's thread.
542
543   // We try not to schedule nullptr callbacks
544   if (!fn) {
545     LOG(ERROR) << "EventBase " << this
546                << ": Scheduling nullptr callbacks is not allowed";
547     return false;
548   }
549
550   // Short-circuit if we are already in our event base
551   if (inRunningEventBaseThread()) {
552     runInLoop(std::move(fn));
553     return true;
554
555   }
556
557   try {
558     queue_->putMessage(std::move(fn));
559   } catch (const std::exception& ex) {
560     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
561                << "for EventBase thread: " << ex.what();
562     return false;
563   }
564
565   return true;
566 }
567
568 bool EventBase::runInEventBaseThreadAndWait(FuncRef fn) {
569   if (inRunningEventBaseThread()) {
570     LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
571                << "allowed";
572     return false;
573   }
574
575   bool ready = false;
576   std::mutex m;
577   std::condition_variable cv;
578   runInEventBaseThread([&] {
579       SCOPE_EXIT {
580         std::unique_lock<std::mutex> l(m);
581         ready = true;
582         cv.notify_one();
583         // We cannot release the lock before notify_one, because a spurious
584         // wakeup in the waiting thread may lead to cv and m going out of scope
585         // prematurely.
586       };
587       fn();
588   });
589   std::unique_lock<std::mutex> l(m);
590   cv.wait(l, [&] { return ready; });
591
592   return true;
593 }
594
595 bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn) {
596   if (isInEventBaseThread()) {
597     fn();
598     return true;
599   } else {
600     return runInEventBaseThreadAndWait(std::move(fn));
601   }
602 }
603
604 bool EventBase::runLoopCallbacks() {
605   if (!loopCallbacks_.empty()) {
606     bumpHandlingTime();
607     // Swap the loopCallbacks_ list with a temporary list on our stack.
608     // This way we will only run callbacks scheduled at the time
609     // runLoopCallbacks() was invoked.
610     //
611     // If any of these callbacks in turn call runInLoop() to schedule more
612     // callbacks, those new callbacks won't be run until the next iteration
613     // around the event loop.  This prevents runInLoop() callbacks from being
614     // able to start file descriptor and timeout based events.
615     LoopCallbackList currentCallbacks;
616     currentCallbacks.swap(loopCallbacks_);
617     runOnceCallbacks_ = &currentCallbacks;
618
619     while (!currentCallbacks.empty()) {
620       LoopCallback* callback = &currentCallbacks.front();
621       currentCallbacks.pop_front();
622       folly::RequestContextScopeGuard rctx(callback->context_);
623       callback->runLoopCallback();
624     }
625
626     runOnceCallbacks_ = nullptr;
627     return true;
628   }
629   return false;
630 }
631
632 void EventBase::initNotificationQueue() {
633   // Infinite size queue
634   queue_.reset(new NotificationQueue<Func>());
635
636   // We allocate fnRunner_ separately, rather than declaring it directly
637   // as a member of EventBase solely so that we don't need to include
638   // NotificationQueue.h from EventBase.h
639   fnRunner_.reset(new FunctionRunner());
640
641   // Mark this as an internal event, so event_base_loop() will return if
642   // there are no other events besides this one installed.
643   //
644   // Most callers don't care about the internal notification queue used by
645   // EventBase.  The queue is always installed, so if we did count the queue as
646   // an active event, loop() would never exit with no more events to process.
647   // Users can use loopForever() if they do care about the notification queue.
648   // (This is useful for EventBase threads that do nothing but process
649   // runInEventBaseThread() notifications.)
650   fnRunner_->startConsumingInternal(this, queue_.get());
651 }
652
653 void EventBase::SmoothLoopTime::setTimeInterval(
654     std::chrono::microseconds timeInterval) {
655   expCoeff_ = -1.0 / timeInterval.count();
656   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
657 }
658
659 void EventBase::SmoothLoopTime::reset(double value) {
660   value_ = value;
661 }
662
663 void EventBase::SmoothLoopTime::addSample(
664     std::chrono::microseconds idle,
665     std::chrono::microseconds busy) {
666   /*
667    * Position at which the busy sample is considered to be taken.
668    * (Allows to quickly skew our average without editing much code)
669    */
670   enum BusySamplePosition {
671     RIGHT = 0, // busy sample placed at the end of the iteration
672     CENTER = 1, // busy sample placed at the middle point of the iteration
673     LEFT = 2, // busy sample placed at the beginning of the iteration
674   };
675
676   // See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
677   // and D676020 for more info on this calculation.
678   VLOG(11) << "idle " << idle.count() << " oldBusyLeftover_ "
679            << oldBusyLeftover_.count() << " idle + oldBusyLeftover_ "
680            << (idle + oldBusyLeftover_).count() << " busy " << busy.count()
681            << " " << __PRETTY_FUNCTION__;
682   idle += oldBusyLeftover_ + busy;
683   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
684   idle -= oldBusyLeftover_;
685
686   double coeff = exp(idle.count() * expCoeff_);
687   value_ *= coeff;
688   value_ += (1.0 - coeff) * busy.count();
689 }
690
691 bool EventBase::nothingHandledYet() const noexcept {
692   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
693   return (nextLoopCnt_ != latestLoopCnt_);
694 }
695
696 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
697                                       InternalEnum internal) {
698
699   struct event* ev = obj->getEvent();
700   assert(ev->ev_base == nullptr);
701
702   event_base_set(getLibeventBase(), ev);
703   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
704     // Set the EVLIST_INTERNAL flag
705     event_ref_flags(ev) |= EVLIST_INTERNAL;
706   }
707 }
708
709 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
710   cancelTimeout(obj);
711   struct event* ev = obj->getEvent();
712   ev->ev_base = nullptr;
713 }
714
715 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
716                                  TimeoutManager::timeout_type timeout) {
717   dcheckIsInEventBaseThread();
718   // Set up the timeval and add the event
719   struct timeval tv;
720   tv.tv_sec = long(timeout.count() / 1000LL);
721   tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
722
723   struct event* ev = obj->getEvent();
724   if (event_add(ev, &tv) < 0) {
725     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
726     return false;
727   }
728
729   return true;
730 }
731
732 void EventBase::cancelTimeout(AsyncTimeout* obj) {
733   dcheckIsInEventBaseThread();
734   struct event* ev = obj->getEvent();
735   if (EventUtil::isEventRegistered(ev)) {
736     event_del(ev);
737   }
738 }
739
740 void EventBase::setName(const std::string& name) {
741   dcheckIsInEventBaseThread();
742   name_ = name;
743
744   if (isRunning()) {
745     setThreadName(loopThread_.load(std::memory_order_relaxed),
746                   name_);
747   }
748 }
749
750 const std::string& EventBase::getName() {
751   dcheckIsInEventBaseThread();
752   return name_;
753 }
754
755 const char* EventBase::getLibeventVersion() { return event_get_version(); }
756 const char* EventBase::getLibeventMethod() { return event_get_method(); }
757
758 VirtualEventBase& EventBase::getVirtualEventBase() {
759   folly::call_once(virtualEventBaseInitFlag_, [&] {
760     virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
761   });
762
763   return *virtualEventBase_;
764 }
765
766 } // folly