062ec2d3dd4e1f2726a31644eb126cb6a7311115
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/VirtualEventBase.h>
23
24 #include <folly/Memory.h>
25 #include <folly/ThreadName.h>
26 #include <folly/io/async/NotificationQueue.h>
27 #include <folly/portability/Unistd.h>
28
29 #include <condition_variable>
30 #include <fcntl.h>
31 #include <mutex>
32 #include <thread>
33
34 namespace folly {
35
36 /*
37  * EventBase::FunctionRunner
38  */
39
40 class EventBase::FunctionRunner
41     : public NotificationQueue<EventBase::Func>::Consumer {
42  public:
43   void messageAvailable(Func&& msg) override {
44     // In libevent2, internal events do not break the loop.
45     // Most users would expect loop(), followed by runInEventBaseThread(),
46     // to break the loop and check if it should exit or not.
47     // To have similar bejaviour to libevent1.4, tell the loop to break here.
48     // Note that loop() may still continue to loop, but it will also check the
49     // stop_ flag as well as runInLoop callbacks, etc.
50     event_base_loopbreak(getEventBase()->evb_);
51
52     if (!msg) {
53       // terminateLoopSoon() sends a null message just to
54       // wake up the loop.  We can ignore these messages.
55       return;
56     }
57
58     // The function should never throw an exception, because we have no
59     // way of knowing what sort of error handling to perform.
60     //
61     // If it does throw, log a message and abort the program.
62     try {
63       msg();
64     } catch (const std::exception& ex) {
65       LOG(ERROR) << "runInEventBaseThread() function threw a "
66                  << typeid(ex).name() << " exception: " << ex.what();
67       abort();
68     } catch (...) {
69       LOG(ERROR) << "runInEventBaseThread() function threw an exception";
70       abort();
71     }
72   }
73 };
74
75 // The interface used to libevent is not thread-safe.  Calls to
76 // event_init() and event_base_free() directly modify an internal
77 // global 'current_base', so a mutex is required to protect this.
78 //
79 // event_init() should only ever be called once.  Subsequent calls
80 // should be made to event_base_new().  We can recognise that
81 // event_init() has already been called by simply inspecting current_base.
82 static std::mutex libevent_mutex_;
83
84 /*
85  * EventBase methods
86  */
87
88 EventBase::EventBase(bool enableTimeMeasurement)
89   : runOnceCallbacks_(nullptr)
90   , stop_(false)
91   , loopThread_()
92   , queue_(nullptr)
93   , fnRunner_(nullptr)
94   , maxLatency_(0)
95   , avgLoopTime_(std::chrono::seconds(2))
96   , maxLatencyLoopTime_(avgLoopTime_)
97   , enableTimeMeasurement_(enableTimeMeasurement)
98   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
99   , latestLoopCnt_(nextLoopCnt_)
100   , startWork_()
101   , observer_(nullptr)
102   , observerSampleCount_(0)
103   , executionObserver_(nullptr) {
104   struct event ev;
105   {
106     std::lock_guard<std::mutex> lock(libevent_mutex_);
107
108     // The value 'current_base' (libevent 1) or
109     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
110     // allowing examination of its value without an explicit reference here.
111     // If ev.ev_base is NULL, then event_init() must be called, otherwise
112     // call event_base_new().
113     event_set(&ev, 0, 0, nullptr, nullptr);
114     if (!ev.ev_base) {
115       evb_ = event_init();
116     }
117   }
118
119   if (ev.ev_base) {
120     evb_ = event_base_new();
121   }
122
123   if (UNLIKELY(evb_ == nullptr)) {
124     LOG(ERROR) << "EventBase(): Failed to init event base.";
125     folly::throwSystemError("error in EventBase::EventBase()");
126   }
127   VLOG(5) << "EventBase(): Created.";
128   initNotificationQueue();
129   RequestContext::saveContext();
130 }
131
132 // takes ownership of the event_base
133 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
134   : runOnceCallbacks_(nullptr)
135   , stop_(false)
136   , loopThread_()
137   , evb_(evb)
138   , queue_(nullptr)
139   , fnRunner_(nullptr)
140   , maxLatency_(0)
141   , avgLoopTime_(std::chrono::seconds(2))
142   , maxLatencyLoopTime_(avgLoopTime_)
143   , enableTimeMeasurement_(enableTimeMeasurement)
144   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
145   , latestLoopCnt_(nextLoopCnt_)
146   , startWork_()
147   , observer_(nullptr)
148   , observerSampleCount_(0)
149   , executionObserver_(nullptr) {
150   if (UNLIKELY(evb_ == nullptr)) {
151     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
152     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
153   }
154   initNotificationQueue();
155   RequestContext::saveContext();
156 }
157
158 EventBase::~EventBase() {
159   std::future<void> virtualEventBaseDestroyFuture;
160   if (virtualEventBase_) {
161     virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
162   }
163
164   // Keep looping until all keep-alive handles are released. Each keep-alive
165   // handle signals that some external code will still schedule some work on
166   // this EventBase (so it's not safe to destroy it).
167   while (loopKeepAliveCount() > 0) {
168     applyLoopKeepAlive();
169     loopOnce();
170   }
171
172   if (virtualEventBaseDestroyFuture.valid()) {
173     virtualEventBaseDestroyFuture.get();
174   }
175
176   // Call all destruction callbacks, before we start cleaning up our state.
177   while (!onDestructionCallbacks_.empty()) {
178     LoopCallback* callback = &onDestructionCallbacks_.front();
179     onDestructionCallbacks_.pop_front();
180     callback->runLoopCallback();
181   }
182
183   clearCobTimeouts();
184
185   DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
186
187   (void)runLoopCallbacks();
188
189   if (!fnRunner_->consumeUntilDrained()) {
190     LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
191   }
192
193   // Stop consumer before deleting NotificationQueue
194   fnRunner_->stopConsuming();
195   {
196     std::lock_guard<std::mutex> lock(libevent_mutex_);
197     event_base_free(evb_);
198   }
199
200   {
201     std::lock_guard<std::mutex> lock(localStorageMutex_);
202     for (auto storage : localStorageToDtor_) {
203       storage->onEventBaseDestruction(*this);
204     }
205   }
206   VLOG(5) << "EventBase(): Destroyed.";
207 }
208
209 size_t EventBase::getNotificationQueueSize() const {
210   return queue_->size();
211 }
212
213 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
214   fnRunner_->setMaxReadAtOnce(maxAtOnce);
215 }
216
217 // Set smoothing coefficient for loop load average; input is # of milliseconds
218 // for exp(-1) decay.
219 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
220   assert(enableTimeMeasurement_);
221   std::chrono::microseconds us = std::chrono::milliseconds(ms);
222   if (ms > std::chrono::milliseconds::zero()) {
223     maxLatencyLoopTime_.setTimeInterval(us);
224     avgLoopTime_.setTimeInterval(us);
225   } else {
226     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
227   }
228 }
229
230 void EventBase::resetLoadAvg(double value) {
231   assert(enableTimeMeasurement_);
232   avgLoopTime_.reset(value);
233   maxLatencyLoopTime_.reset(value);
234 }
235
236 static std::chrono::milliseconds
237 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
238   auto result = std::chrono::steady_clock::now() - *prev;
239   *prev = std::chrono::steady_clock::now();
240
241   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
242 }
243
244 void EventBase::waitUntilRunning() {
245   while (!isRunning()) {
246     std::this_thread::yield();
247   }
248 }
249
250 // enters the event_base loop -- will only exit when forced to
251 bool EventBase::loop() {
252   return loopBody();
253 }
254
255 bool EventBase::loopOnce(int flags) {
256   return loopBody(flags | EVLOOP_ONCE);
257 }
258
259 bool EventBase::loopBody(int flags) {
260   VLOG(5) << "EventBase(): Starting loop.";
261
262   DCHECK(!invokingLoop_)
263       << "Your code just tried to loop over an event base from inside another "
264       << "event base loop. Since libevent is not reentrant, this leads to "
265       << "undefined behavior in opt builds. Please fix immediately. For the "
266       << "common case of an inner function that needs to do some synchronous "
267       << "computation on an event-base, replace getEventBase() by a new, "
268       << "stack-allocated EvenBase.";
269   invokingLoop_ = true;
270   SCOPE_EXIT {
271     invokingLoop_ = false;
272   };
273
274   int res = 0;
275   bool ranLoopCallbacks;
276   bool blocking = !(flags & EVLOOP_NONBLOCK);
277   bool once = (flags & EVLOOP_ONCE);
278
279   // time-measurement variables.
280   std::chrono::steady_clock::time_point prev;
281   std::chrono::steady_clock::time_point idleStart = {};
282   std::chrono::microseconds busy;
283   std::chrono::microseconds idle;
284
285   loopThread_.store(std::this_thread::get_id(), std::memory_order_release);
286
287   if (!name_.empty()) {
288     setThreadName(name_);
289   }
290
291   if (enableTimeMeasurement_) {
292     prev = std::chrono::steady_clock::now();
293     idleStart = std::chrono::steady_clock::now();
294   }
295
296   while (!stop_.load(std::memory_order_acquire)) {
297     applyLoopKeepAlive();
298     ++nextLoopCnt_;
299
300     // Run the before loop callbacks
301     LoopCallbackList callbacks;
302     callbacks.swap(runBeforeLoopCallbacks_);
303
304     while(!callbacks.empty()) {
305       auto* item = &callbacks.front();
306       callbacks.pop_front();
307       item->runLoopCallback();
308     }
309
310     // nobody can add loop callbacks from within this thread if
311     // we don't have to handle anything to start with...
312     if (blocking && loopCallbacks_.empty()) {
313       res = event_base_loop(evb_, EVLOOP_ONCE);
314     } else {
315       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
316     }
317
318     ranLoopCallbacks = runLoopCallbacks();
319
320     if (enableTimeMeasurement_) {
321       busy = std::chrono::duration_cast<std::chrono::microseconds>(
322           std::chrono::steady_clock::now() - startWork_);
323       idle = std::chrono::duration_cast<std::chrono::microseconds>(
324           startWork_ - idleStart);
325
326       avgLoopTime_.addSample(std::chrono::microseconds(idle),
327         std::chrono::microseconds(busy));
328       maxLatencyLoopTime_.addSample(std::chrono::microseconds(idle),
329         std::chrono::microseconds(busy));
330
331       if (observer_) {
332         if (observerSampleCount_++ == observer_->getSampleRate()) {
333           observerSampleCount_ = 0;
334           observer_->loopSample(busy.count(), idle.count());
335         }
336       }
337
338       VLOG(11) << "EventBase "  << this         << " did not timeout " <<
339         " loop time guess: "    << (busy + idle).count()  <<
340         " idle time: "          << idle.count()         <<
341         " busy time: "          << busy.count()         <<
342         " avgLoopTime: "        << avgLoopTime_.get() <<
343         " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
344         " maxLatency_: "        << maxLatency_.count() << "us" <<
345         " notificationQueueSize: " << getNotificationQueueSize() <<
346         " nothingHandledYet(): " << nothingHandledYet();
347
348       // see if our average loop time has exceeded our limit
349       if ((maxLatency_ > std::chrono::microseconds::zero()) &&
350           (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
351         maxLatencyCob_();
352         // back off temporarily -- don't keep spamming maxLatencyCob_
353         // if we're only a bit over the limit
354         maxLatencyLoopTime_.dampen(0.9);
355       }
356
357       // Our loop run did real work; reset the idle timer
358       idleStart = std::chrono::steady_clock::now();
359     } else {
360       VLOG(11) << "EventBase " << this << " did not timeout";
361     }
362
363     // If the event loop indicate that there were no more events, and
364     // we also didn't have any loop callbacks to run, there is nothing left to
365     // do.
366     if (res != 0 && !ranLoopCallbacks) {
367       // Since Notification Queue is marked 'internal' some events may not have
368       // run.  Run them manually if so, and continue looping.
369       //
370       if (getNotificationQueueSize() > 0) {
371         fnRunner_->handlerReady(0);
372       } else {
373         break;
374       }
375     }
376
377     if (enableTimeMeasurement_) {
378       VLOG(11) << "EventBase " << this << " loop time: " <<
379         getTimeDelta(&prev).count();
380     }
381
382     if (once) {
383       break;
384     }
385   }
386   // Reset stop_ so loop() can be called again
387   stop_ = false;
388
389   if (res < 0) {
390     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
391     return false;
392   } else if (res == 1) {
393     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
394   } else if (res > 1) {
395     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
396     return false;
397   }
398
399   loopThread_.store({}, std::memory_order_release);
400
401   VLOG(5) << "EventBase(): Done with loop.";
402   return true;
403 }
404
405 ssize_t EventBase::loopKeepAliveCount() {
406   if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
407     loopKeepAliveCount_ +=
408         loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
409   }
410   DCHECK_GE(loopKeepAliveCount_, 0);
411   return loopKeepAliveCount_;
412 }
413
414 void EventBase::applyLoopKeepAlive() {
415   if (loopKeepAliveActive_ && loopKeepAliveCount() == 0) {
416     // Restore the notification queue internal flag
417     fnRunner_->stopConsuming();
418     fnRunner_->startConsumingInternal(this, queue_.get());
419     loopKeepAliveActive_ = false;
420   } else if (!loopKeepAliveActive_ && loopKeepAliveCount() > 0) {
421     // Update the notification queue event to treat it as a normal
422     // (non-internal) event.  The notification queue event always remains
423     // installed, and the main loop won't exit with it installed.
424     fnRunner_->stopConsuming();
425     fnRunner_->startConsuming(this, queue_.get());
426     loopKeepAliveActive_ = true;
427   }
428 }
429
430 void EventBase::loopForever() {
431   bool ret;
432   {
433     SCOPE_EXIT {
434       applyLoopKeepAlive();
435     };
436     // Make sure notification queue events are treated as normal events.
437     // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
438     // released inside a loop.
439     ++loopKeepAliveCount_;
440     SCOPE_EXIT {
441       --loopKeepAliveCount_;
442     };
443     ret = loop();
444   }
445
446   if (!ret) {
447     folly::throwSystemError("error in EventBase::loopForever()");
448   }
449 }
450
451 void EventBase::bumpHandlingTime() {
452   if (!enableTimeMeasurement_) {
453     return;
454   }
455
456   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
457     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
458   if (nothingHandledYet()) {
459     latestLoopCnt_ = nextLoopCnt_;
460     // set the time
461     startWork_ = std::chrono::steady_clock::now();
462
463     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
464              << " (loop) startWork_ " << startWork_.time_since_epoch().count();
465   }
466 }
467
468 void EventBase::terminateLoopSoon() {
469   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
470
471   // Set stop to true, so the event loop will know to exit.
472   // TODO: We should really use an atomic operation here with a release
473   // barrier.
474   stop_ = true;
475
476   // Call event_base_loopbreak() so that libevent will exit the next time
477   // around the loop.
478   event_base_loopbreak(evb_);
479
480   // If terminateLoopSoon() is called from another thread,
481   // the EventBase thread might be stuck waiting for events.
482   // In this case, it won't wake up and notice that stop_ is set until it
483   // receives another event.  Send an empty frame to the notification queue
484   // so that the event loop will wake up even if there are no other events.
485   //
486   // We don't care about the return value of trySendFrame().  If it fails
487   // this likely means the EventBase already has lots of events waiting
488   // anyway.
489   try {
490     queue_->putMessage(nullptr);
491   } catch (...) {
492     // We don't care if putMessage() fails.  This likely means
493     // the EventBase already has lots of events waiting anyway.
494   }
495 }
496
497 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
498   DCHECK(isInEventBaseThread());
499   callback->cancelLoopCallback();
500   callback->context_ = RequestContext::saveContext();
501   if (runOnceCallbacks_ != nullptr && thisIteration) {
502     runOnceCallbacks_->push_back(*callback);
503   } else {
504     loopCallbacks_.push_back(*callback);
505   }
506 }
507
508 void EventBase::runInLoop(Func cob, bool thisIteration) {
509   DCHECK(isInEventBaseThread());
510   auto wrapper = new FunctionLoopCallback(std::move(cob));
511   wrapper->context_ = RequestContext::saveContext();
512   if (runOnceCallbacks_ != nullptr && thisIteration) {
513     runOnceCallbacks_->push_back(*wrapper);
514   } else {
515     loopCallbacks_.push_back(*wrapper);
516   }
517 }
518
519 void EventBase::runOnDestruction(LoopCallback* callback) {
520   std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
521   callback->cancelLoopCallback();
522   onDestructionCallbacks_.push_back(*callback);
523 }
524
525 void EventBase::runBeforeLoop(LoopCallback* callback) {
526   DCHECK(isInEventBaseThread());
527   callback->cancelLoopCallback();
528   runBeforeLoopCallbacks_.push_back(*callback);
529 }
530
531 bool EventBase::runInEventBaseThread(Func fn) {
532   // Send the message.
533   // It will be received by the FunctionRunner in the EventBase's thread.
534
535   // We try not to schedule nullptr callbacks
536   if (!fn) {
537     LOG(ERROR) << "EventBase " << this
538                << ": Scheduling nullptr callbacks is not allowed";
539     return false;
540   }
541
542   // Short-circuit if we are already in our event base
543   if (inRunningEventBaseThread()) {
544     runInLoop(std::move(fn));
545     return true;
546
547   }
548
549   try {
550     queue_->putMessage(std::move(fn));
551   } catch (const std::exception& ex) {
552     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
553                << "for EventBase thread: " << ex.what();
554     return false;
555   }
556
557   return true;
558 }
559
560 bool EventBase::runInEventBaseThreadAndWait(FuncRef fn) {
561   if (inRunningEventBaseThread()) {
562     LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
563                << "allowed";
564     return false;
565   }
566
567   bool ready = false;
568   std::mutex m;
569   std::condition_variable cv;
570   runInEventBaseThread([&] {
571       SCOPE_EXIT {
572         std::unique_lock<std::mutex> l(m);
573         ready = true;
574         cv.notify_one();
575         // We cannot release the lock before notify_one, because a spurious
576         // wakeup in the waiting thread may lead to cv and m going out of scope
577         // prematurely.
578       };
579       fn();
580   });
581   std::unique_lock<std::mutex> l(m);
582   cv.wait(l, [&] { return ready; });
583
584   return true;
585 }
586
587 bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn) {
588   if (isInEventBaseThread()) {
589     fn();
590     return true;
591   } else {
592     return runInEventBaseThreadAndWait(std::move(fn));
593   }
594 }
595
596 bool EventBase::runLoopCallbacks() {
597   if (!loopCallbacks_.empty()) {
598     bumpHandlingTime();
599     // Swap the loopCallbacks_ list with a temporary list on our stack.
600     // This way we will only run callbacks scheduled at the time
601     // runLoopCallbacks() was invoked.
602     //
603     // If any of these callbacks in turn call runInLoop() to schedule more
604     // callbacks, those new callbacks won't be run until the next iteration
605     // around the event loop.  This prevents runInLoop() callbacks from being
606     // able to start file descriptor and timeout based events.
607     LoopCallbackList currentCallbacks;
608     currentCallbacks.swap(loopCallbacks_);
609     runOnceCallbacks_ = &currentCallbacks;
610
611     while (!currentCallbacks.empty()) {
612       LoopCallback* callback = &currentCallbacks.front();
613       currentCallbacks.pop_front();
614       folly::RequestContextScopeGuard rctx(callback->context_);
615       callback->runLoopCallback();
616     }
617
618     runOnceCallbacks_ = nullptr;
619     return true;
620   }
621   return false;
622 }
623
624 void EventBase::initNotificationQueue() {
625   // Infinite size queue
626   queue_.reset(new NotificationQueue<Func>());
627
628   // We allocate fnRunner_ separately, rather than declaring it directly
629   // as a member of EventBase solely so that we don't need to include
630   // NotificationQueue.h from EventBase.h
631   fnRunner_.reset(new FunctionRunner());
632
633   // Mark this as an internal event, so event_base_loop() will return if
634   // there are no other events besides this one installed.
635   //
636   // Most callers don't care about the internal notification queue used by
637   // EventBase.  The queue is always installed, so if we did count the queue as
638   // an active event, loop() would never exit with no more events to process.
639   // Users can use loopForever() if they do care about the notification queue.
640   // (This is useful for EventBase threads that do nothing but process
641   // runInEventBaseThread() notifications.)
642   fnRunner_->startConsumingInternal(this, queue_.get());
643 }
644
645 void EventBase::SmoothLoopTime::setTimeInterval(
646     std::chrono::microseconds timeInterval) {
647   expCoeff_ = -1.0 / timeInterval.count();
648   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
649 }
650
651 void EventBase::SmoothLoopTime::reset(double value) {
652   value_ = value;
653 }
654
655 void EventBase::SmoothLoopTime::addSample(
656     std::chrono::microseconds idle,
657     std::chrono::microseconds busy) {
658   /*
659    * Position at which the busy sample is considered to be taken.
660    * (Allows to quickly skew our average without editing much code)
661    */
662   enum BusySamplePosition {
663     RIGHT = 0, // busy sample placed at the end of the iteration
664     CENTER = 1, // busy sample placed at the middle point of the iteration
665     LEFT = 2, // busy sample placed at the beginning of the iteration
666   };
667
668   // See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
669   // and D676020 for more info on this calculation.
670   VLOG(11) << "idle " << idle.count() << " oldBusyLeftover_ "
671            << oldBusyLeftover_.count() << " idle + oldBusyLeftover_ "
672            << (idle + oldBusyLeftover_).count() << " busy " << busy.count()
673            << " " << __PRETTY_FUNCTION__;
674   idle += oldBusyLeftover_ + busy;
675   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
676   idle -= oldBusyLeftover_;
677
678   double coeff = exp(idle.count() * expCoeff_);
679   value_ *= coeff;
680   value_ += (1.0 - coeff) * busy.count();
681 }
682
683 bool EventBase::nothingHandledYet() const noexcept {
684   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
685   return (nextLoopCnt_ != latestLoopCnt_);
686 }
687
688 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
689                                       InternalEnum internal) {
690
691   struct event* ev = obj->getEvent();
692   assert(ev->ev_base == nullptr);
693
694   event_base_set(getLibeventBase(), ev);
695   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
696     // Set the EVLIST_INTERNAL flag
697     event_ref_flags(ev) |= EVLIST_INTERNAL;
698   }
699 }
700
701 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
702   cancelTimeout(obj);
703   struct event* ev = obj->getEvent();
704   ev->ev_base = nullptr;
705 }
706
707 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
708                                  TimeoutManager::timeout_type timeout) {
709   assert(isInEventBaseThread());
710   // Set up the timeval and add the event
711   struct timeval tv;
712   tv.tv_sec = long(timeout.count() / 1000LL);
713   tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
714
715   struct event* ev = obj->getEvent();
716   if (event_add(ev, &tv) < 0) {
717     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
718     return false;
719   }
720
721   return true;
722 }
723
724 void EventBase::cancelTimeout(AsyncTimeout* obj) {
725   assert(isInEventBaseThread());
726   struct event* ev = obj->getEvent();
727   if (EventUtil::isEventRegistered(ev)) {
728     event_del(ev);
729   }
730 }
731
732 void EventBase::setName(const std::string& name) {
733   assert(isInEventBaseThread());
734   name_ = name;
735
736   if (isRunning()) {
737     setThreadName(loopThread_.load(std::memory_order_relaxed),
738                   name_);
739   }
740 }
741
742 const std::string& EventBase::getName() {
743   assert(isInEventBaseThread());
744   return name_;
745 }
746
747 const char* EventBase::getLibeventVersion() { return event_get_version(); }
748 const char* EventBase::getLibeventMethod() { return event_get_method(); }
749
750 VirtualEventBase& EventBase::getVirtualEventBase() {
751   folly::call_once(virtualEventBaseInitFlag_, [&] {
752     virtualEventBase_ = folly::make_unique<VirtualEventBase>(*this);
753   });
754
755   return *virtualEventBase_;
756 }
757
758 } // folly