Default VirtualEventBase
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/VirtualEventBase.h>
23
24 #include <folly/ThreadName.h>
25 #include <folly/io/async/NotificationQueue.h>
26 #include <folly/portability/Unistd.h>
27
28 #include <condition_variable>
29 #include <fcntl.h>
30 #include <mutex>
31 #include <thread>
32
33 namespace folly {
34
35 /*
36  * EventBase::FunctionRunner
37  */
38
39 class EventBase::FunctionRunner
40     : public NotificationQueue<EventBase::Func>::Consumer {
41  public:
42   void messageAvailable(Func&& msg) override {
43     // In libevent2, internal events do not break the loop.
44     // Most users would expect loop(), followed by runInEventBaseThread(),
45     // to break the loop and check if it should exit or not.
46     // To have similar bejaviour to libevent1.4, tell the loop to break here.
47     // Note that loop() may still continue to loop, but it will also check the
48     // stop_ flag as well as runInLoop callbacks, etc.
49     event_base_loopbreak(getEventBase()->evb_);
50
51     if (!msg) {
52       // terminateLoopSoon() sends a null message just to
53       // wake up the loop.  We can ignore these messages.
54       return;
55     }
56
57     // The function should never throw an exception, because we have no
58     // way of knowing what sort of error handling to perform.
59     //
60     // If it does throw, log a message and abort the program.
61     try {
62       msg();
63     } catch (const std::exception& ex) {
64       LOG(ERROR) << "runInEventBaseThread() function threw a "
65                  << typeid(ex).name() << " exception: " << ex.what();
66       abort();
67     } catch (...) {
68       LOG(ERROR) << "runInEventBaseThread() function threw an exception";
69       abort();
70     }
71   }
72 };
73
74 // The interface used to libevent is not thread-safe.  Calls to
75 // event_init() and event_base_free() directly modify an internal
76 // global 'current_base', so a mutex is required to protect this.
77 //
78 // event_init() should only ever be called once.  Subsequent calls
79 // should be made to event_base_new().  We can recognise that
80 // event_init() has already been called by simply inspecting current_base.
81 static std::mutex libevent_mutex_;
82
83 /*
84  * EventBase methods
85  */
86
87 EventBase::EventBase(bool enableTimeMeasurement)
88   : runOnceCallbacks_(nullptr)
89   , stop_(false)
90   , loopThread_()
91   , queue_(nullptr)
92   , fnRunner_(nullptr)
93   , maxLatency_(0)
94   , avgLoopTime_(std::chrono::seconds(2))
95   , maxLatencyLoopTime_(avgLoopTime_)
96   , enableTimeMeasurement_(enableTimeMeasurement)
97   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
98   , latestLoopCnt_(nextLoopCnt_)
99   , startWork_()
100   , observer_(nullptr)
101   , observerSampleCount_(0)
102   , executionObserver_(nullptr) {
103   struct event ev;
104   {
105     std::lock_guard<std::mutex> lock(libevent_mutex_);
106
107     // The value 'current_base' (libevent 1) or
108     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
109     // allowing examination of its value without an explicit reference here.
110     // If ev.ev_base is NULL, then event_init() must be called, otherwise
111     // call event_base_new().
112     event_set(&ev, 0, 0, nullptr, nullptr);
113     if (!ev.ev_base) {
114       evb_ = event_init();
115     }
116   }
117
118   if (ev.ev_base) {
119     evb_ = event_base_new();
120   }
121
122   if (UNLIKELY(evb_ == nullptr)) {
123     LOG(ERROR) << "EventBase(): Failed to init event base.";
124     folly::throwSystemError("error in EventBase::EventBase()");
125   }
126   VLOG(5) << "EventBase(): Created.";
127   initNotificationQueue();
128   RequestContext::saveContext();
129 }
130
131 // takes ownership of the event_base
132 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
133   : runOnceCallbacks_(nullptr)
134   , stop_(false)
135   , loopThread_()
136   , evb_(evb)
137   , queue_(nullptr)
138   , fnRunner_(nullptr)
139   , maxLatency_(0)
140   , avgLoopTime_(std::chrono::seconds(2))
141   , maxLatencyLoopTime_(avgLoopTime_)
142   , enableTimeMeasurement_(enableTimeMeasurement)
143   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
144   , latestLoopCnt_(nextLoopCnt_)
145   , startWork_()
146   , observer_(nullptr)
147   , observerSampleCount_(0)
148   , executionObserver_(nullptr) {
149   if (UNLIKELY(evb_ == nullptr)) {
150     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
151     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
152   }
153   initNotificationQueue();
154   RequestContext::saveContext();
155 }
156
157 EventBase::~EventBase() {
158   std::future<void> virtualEventBaseDestroyFuture;
159   if (virtualEventBase_) {
160     virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
161   }
162
163   // Keep looping until all keep-alive handles are released. Each keep-alive
164   // handle signals that some external code will still schedule some work on
165   // this EventBase (so it's not safe to destroy it).
166   while (loopKeepAliveCount() > 0) {
167     applyLoopKeepAlive();
168     loopOnce();
169   }
170
171   if (virtualEventBaseDestroyFuture.valid()) {
172     virtualEventBaseDestroyFuture.get();
173   }
174
175   // Call all destruction callbacks, before we start cleaning up our state.
176   while (!onDestructionCallbacks_.empty()) {
177     LoopCallback* callback = &onDestructionCallbacks_.front();
178     onDestructionCallbacks_.pop_front();
179     callback->runLoopCallback();
180   }
181
182   clearCobTimeouts();
183
184   DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
185
186   (void)runLoopCallbacks();
187
188   if (!fnRunner_->consumeUntilDrained()) {
189     LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
190   }
191
192   // Stop consumer before deleting NotificationQueue
193   fnRunner_->stopConsuming();
194   {
195     std::lock_guard<std::mutex> lock(libevent_mutex_);
196     event_base_free(evb_);
197   }
198
199   {
200     std::lock_guard<std::mutex> lock(localStorageMutex_);
201     for (auto storage : localStorageToDtor_) {
202       storage->onEventBaseDestruction(*this);
203     }
204   }
205   VLOG(5) << "EventBase(): Destroyed.";
206 }
207
208 size_t EventBase::getNotificationQueueSize() const {
209   return queue_->size();
210 }
211
212 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
213   fnRunner_->setMaxReadAtOnce(maxAtOnce);
214 }
215
216 // Set smoothing coefficient for loop load average; input is # of milliseconds
217 // for exp(-1) decay.
218 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
219   assert(enableTimeMeasurement_);
220   std::chrono::microseconds us = std::chrono::milliseconds(ms);
221   if (ms > std::chrono::milliseconds::zero()) {
222     maxLatencyLoopTime_.setTimeInterval(us);
223     avgLoopTime_.setTimeInterval(us);
224   } else {
225     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
226   }
227 }
228
229 void EventBase::resetLoadAvg(double value) {
230   assert(enableTimeMeasurement_);
231   avgLoopTime_.reset(value);
232   maxLatencyLoopTime_.reset(value);
233 }
234
235 static std::chrono::milliseconds
236 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
237   auto result = std::chrono::steady_clock::now() - *prev;
238   *prev = std::chrono::steady_clock::now();
239
240   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
241 }
242
243 void EventBase::waitUntilRunning() {
244   while (!isRunning()) {
245     std::this_thread::yield();
246   }
247 }
248
249 // enters the event_base loop -- will only exit when forced to
250 bool EventBase::loop() {
251   return loopBody();
252 }
253
254 bool EventBase::loopOnce(int flags) {
255   return loopBody(flags | EVLOOP_ONCE);
256 }
257
258 bool EventBase::loopBody(int flags) {
259   VLOG(5) << "EventBase(): Starting loop.";
260
261   DCHECK(!invokingLoop_)
262       << "Your code just tried to loop over an event base from inside another "
263       << "event base loop. Since libevent is not reentrant, this leads to "
264       << "undefined behavior in opt builds. Please fix immediately. For the "
265       << "common case of an inner function that needs to do some synchronous "
266       << "computation on an event-base, replace getEventBase() by a new, "
267       << "stack-allocated EvenBase.";
268   invokingLoop_ = true;
269   SCOPE_EXIT {
270     invokingLoop_ = false;
271   };
272
273   int res = 0;
274   bool ranLoopCallbacks;
275   bool blocking = !(flags & EVLOOP_NONBLOCK);
276   bool once = (flags & EVLOOP_ONCE);
277
278   // time-measurement variables.
279   std::chrono::steady_clock::time_point prev;
280   std::chrono::steady_clock::time_point idleStart = {};
281   std::chrono::microseconds busy;
282   std::chrono::microseconds idle;
283
284   loopThread_.store(std::this_thread::get_id(), std::memory_order_release);
285
286   if (!name_.empty()) {
287     setThreadName(name_);
288   }
289
290   if (enableTimeMeasurement_) {
291     prev = std::chrono::steady_clock::now();
292     idleStart = std::chrono::steady_clock::now();
293   }
294
295   while (!stop_.load(std::memory_order_acquire)) {
296     applyLoopKeepAlive();
297     ++nextLoopCnt_;
298
299     // Run the before loop callbacks
300     LoopCallbackList callbacks;
301     callbacks.swap(runBeforeLoopCallbacks_);
302
303     while(!callbacks.empty()) {
304       auto* item = &callbacks.front();
305       callbacks.pop_front();
306       item->runLoopCallback();
307     }
308
309     // nobody can add loop callbacks from within this thread if
310     // we don't have to handle anything to start with...
311     if (blocking && loopCallbacks_.empty()) {
312       res = event_base_loop(evb_, EVLOOP_ONCE);
313     } else {
314       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
315     }
316
317     ranLoopCallbacks = runLoopCallbacks();
318
319     if (enableTimeMeasurement_) {
320       busy = std::chrono::duration_cast<std::chrono::microseconds>(
321           std::chrono::steady_clock::now() - startWork_);
322       idle = std::chrono::duration_cast<std::chrono::microseconds>(
323           startWork_ - idleStart);
324
325       avgLoopTime_.addSample(std::chrono::microseconds(idle),
326         std::chrono::microseconds(busy));
327       maxLatencyLoopTime_.addSample(std::chrono::microseconds(idle),
328         std::chrono::microseconds(busy));
329
330       if (observer_) {
331         if (observerSampleCount_++ == observer_->getSampleRate()) {
332           observerSampleCount_ = 0;
333           observer_->loopSample(busy.count(), idle.count());
334         }
335       }
336
337       VLOG(11) << "EventBase "  << this         << " did not timeout " <<
338         " loop time guess: "    << (busy + idle).count()  <<
339         " idle time: "          << idle.count()         <<
340         " busy time: "          << busy.count()         <<
341         " avgLoopTime: "        << avgLoopTime_.get() <<
342         " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
343         " maxLatency_: "        << maxLatency_.count() << "us" <<
344         " notificationQueueSize: " << getNotificationQueueSize() <<
345         " nothingHandledYet(): " << nothingHandledYet();
346
347       // see if our average loop time has exceeded our limit
348       if ((maxLatency_ > std::chrono::microseconds::zero()) &&
349           (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
350         maxLatencyCob_();
351         // back off temporarily -- don't keep spamming maxLatencyCob_
352         // if we're only a bit over the limit
353         maxLatencyLoopTime_.dampen(0.9);
354       }
355
356       // Our loop run did real work; reset the idle timer
357       idleStart = std::chrono::steady_clock::now();
358     } else {
359       VLOG(11) << "EventBase " << this << " did not timeout";
360     }
361
362     // If the event loop indicate that there were no more events, and
363     // we also didn't have any loop callbacks to run, there is nothing left to
364     // do.
365     if (res != 0 && !ranLoopCallbacks) {
366       // Since Notification Queue is marked 'internal' some events may not have
367       // run.  Run them manually if so, and continue looping.
368       //
369       if (getNotificationQueueSize() > 0) {
370         fnRunner_->handlerReady(0);
371       } else {
372         break;
373       }
374     }
375
376     if (enableTimeMeasurement_) {
377       VLOG(11) << "EventBase " << this << " loop time: " <<
378         getTimeDelta(&prev).count();
379     }
380
381     if (once) {
382       break;
383     }
384   }
385   // Reset stop_ so loop() can be called again
386   stop_ = false;
387
388   if (res < 0) {
389     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
390     return false;
391   } else if (res == 1) {
392     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
393   } else if (res > 1) {
394     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
395     return false;
396   }
397
398   loopThread_.store({}, std::memory_order_release);
399
400   VLOG(5) << "EventBase(): Done with loop.";
401   return true;
402 }
403
404 ssize_t EventBase::loopKeepAliveCount() {
405   if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
406     loopKeepAliveCount_ +=
407         loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
408   }
409   DCHECK_GE(loopKeepAliveCount_, 0);
410   return loopKeepAliveCount_;
411 }
412
413 void EventBase::applyLoopKeepAlive() {
414   if (loopKeepAliveActive_ && loopKeepAliveCount() == 0) {
415     // Restore the notification queue internal flag
416     fnRunner_->stopConsuming();
417     fnRunner_->startConsumingInternal(this, queue_.get());
418     loopKeepAliveActive_ = false;
419   } else if (!loopKeepAliveActive_ && loopKeepAliveCount() > 0) {
420     // Update the notification queue event to treat it as a normal
421     // (non-internal) event.  The notification queue event always remains
422     // installed, and the main loop won't exit with it installed.
423     fnRunner_->stopConsuming();
424     fnRunner_->startConsuming(this, queue_.get());
425     loopKeepAliveActive_ = true;
426   }
427 }
428
429 void EventBase::loopForever() {
430   bool ret;
431   {
432     SCOPE_EXIT {
433       applyLoopKeepAlive();
434     };
435     // Make sure notification queue events are treated as normal events.
436     // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
437     // released inside a loop.
438     ++loopKeepAliveCount_;
439     SCOPE_EXIT {
440       --loopKeepAliveCount_;
441     };
442     ret = loop();
443   }
444
445   if (!ret) {
446     folly::throwSystemError("error in EventBase::loopForever()");
447   }
448 }
449
450 void EventBase::bumpHandlingTime() {
451   if (!enableTimeMeasurement_) {
452     return;
453   }
454
455   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
456     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
457   if (nothingHandledYet()) {
458     latestLoopCnt_ = nextLoopCnt_;
459     // set the time
460     startWork_ = std::chrono::steady_clock::now();
461
462     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
463              << " (loop) startWork_ " << startWork_.time_since_epoch().count();
464   }
465 }
466
467 void EventBase::terminateLoopSoon() {
468   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
469
470   // Set stop to true, so the event loop will know to exit.
471   // TODO: We should really use an atomic operation here with a release
472   // barrier.
473   stop_ = true;
474
475   // Call event_base_loopbreak() so that libevent will exit the next time
476   // around the loop.
477   event_base_loopbreak(evb_);
478
479   // If terminateLoopSoon() is called from another thread,
480   // the EventBase thread might be stuck waiting for events.
481   // In this case, it won't wake up and notice that stop_ is set until it
482   // receives another event.  Send an empty frame to the notification queue
483   // so that the event loop will wake up even if there are no other events.
484   //
485   // We don't care about the return value of trySendFrame().  If it fails
486   // this likely means the EventBase already has lots of events waiting
487   // anyway.
488   try {
489     queue_->putMessage(nullptr);
490   } catch (...) {
491     // We don't care if putMessage() fails.  This likely means
492     // the EventBase already has lots of events waiting anyway.
493   }
494 }
495
496 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
497   DCHECK(isInEventBaseThread());
498   callback->cancelLoopCallback();
499   callback->context_ = RequestContext::saveContext();
500   if (runOnceCallbacks_ != nullptr && thisIteration) {
501     runOnceCallbacks_->push_back(*callback);
502   } else {
503     loopCallbacks_.push_back(*callback);
504   }
505 }
506
507 void EventBase::runInLoop(Func cob, bool thisIteration) {
508   DCHECK(isInEventBaseThread());
509   auto wrapper = new FunctionLoopCallback(std::move(cob));
510   wrapper->context_ = RequestContext::saveContext();
511   if (runOnceCallbacks_ != nullptr && thisIteration) {
512     runOnceCallbacks_->push_back(*wrapper);
513   } else {
514     loopCallbacks_.push_back(*wrapper);
515   }
516 }
517
518 void EventBase::runOnDestruction(LoopCallback* callback) {
519   std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
520   callback->cancelLoopCallback();
521   onDestructionCallbacks_.push_back(*callback);
522 }
523
524 void EventBase::runBeforeLoop(LoopCallback* callback) {
525   DCHECK(isInEventBaseThread());
526   callback->cancelLoopCallback();
527   runBeforeLoopCallbacks_.push_back(*callback);
528 }
529
530 bool EventBase::runInEventBaseThread(Func fn) {
531   // Send the message.
532   // It will be received by the FunctionRunner in the EventBase's thread.
533
534   // We try not to schedule nullptr callbacks
535   if (!fn) {
536     LOG(ERROR) << "EventBase " << this
537                << ": Scheduling nullptr callbacks is not allowed";
538     return false;
539   }
540
541   // Short-circuit if we are already in our event base
542   if (inRunningEventBaseThread()) {
543     runInLoop(std::move(fn));
544     return true;
545
546   }
547
548   try {
549     queue_->putMessage(std::move(fn));
550   } catch (const std::exception& ex) {
551     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
552                << "for EventBase thread: " << ex.what();
553     return false;
554   }
555
556   return true;
557 }
558
559 bool EventBase::runInEventBaseThreadAndWait(FuncRef fn) {
560   if (inRunningEventBaseThread()) {
561     LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
562                << "allowed";
563     return false;
564   }
565
566   bool ready = false;
567   std::mutex m;
568   std::condition_variable cv;
569   runInEventBaseThread([&] {
570       SCOPE_EXIT {
571         std::unique_lock<std::mutex> l(m);
572         ready = true;
573         cv.notify_one();
574         // We cannot release the lock before notify_one, because a spurious
575         // wakeup in the waiting thread may lead to cv and m going out of scope
576         // prematurely.
577       };
578       fn();
579   });
580   std::unique_lock<std::mutex> l(m);
581   cv.wait(l, [&] { return ready; });
582
583   return true;
584 }
585
586 bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn) {
587   if (isInEventBaseThread()) {
588     fn();
589     return true;
590   } else {
591     return runInEventBaseThreadAndWait(std::move(fn));
592   }
593 }
594
595 bool EventBase::runLoopCallbacks() {
596   if (!loopCallbacks_.empty()) {
597     bumpHandlingTime();
598     // Swap the loopCallbacks_ list with a temporary list on our stack.
599     // This way we will only run callbacks scheduled at the time
600     // runLoopCallbacks() was invoked.
601     //
602     // If any of these callbacks in turn call runInLoop() to schedule more
603     // callbacks, those new callbacks won't be run until the next iteration
604     // around the event loop.  This prevents runInLoop() callbacks from being
605     // able to start file descriptor and timeout based events.
606     LoopCallbackList currentCallbacks;
607     currentCallbacks.swap(loopCallbacks_);
608     runOnceCallbacks_ = &currentCallbacks;
609
610     while (!currentCallbacks.empty()) {
611       LoopCallback* callback = &currentCallbacks.front();
612       currentCallbacks.pop_front();
613       folly::RequestContextScopeGuard rctx(callback->context_);
614       callback->runLoopCallback();
615     }
616
617     runOnceCallbacks_ = nullptr;
618     return true;
619   }
620   return false;
621 }
622
623 void EventBase::initNotificationQueue() {
624   // Infinite size queue
625   queue_.reset(new NotificationQueue<Func>());
626
627   // We allocate fnRunner_ separately, rather than declaring it directly
628   // as a member of EventBase solely so that we don't need to include
629   // NotificationQueue.h from EventBase.h
630   fnRunner_.reset(new FunctionRunner());
631
632   // Mark this as an internal event, so event_base_loop() will return if
633   // there are no other events besides this one installed.
634   //
635   // Most callers don't care about the internal notification queue used by
636   // EventBase.  The queue is always installed, so if we did count the queue as
637   // an active event, loop() would never exit with no more events to process.
638   // Users can use loopForever() if they do care about the notification queue.
639   // (This is useful for EventBase threads that do nothing but process
640   // runInEventBaseThread() notifications.)
641   fnRunner_->startConsumingInternal(this, queue_.get());
642 }
643
644 void EventBase::SmoothLoopTime::setTimeInterval(
645     std::chrono::microseconds timeInterval) {
646   expCoeff_ = -1.0 / timeInterval.count();
647   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
648 }
649
650 void EventBase::SmoothLoopTime::reset(double value) {
651   value_ = value;
652 }
653
654 void EventBase::SmoothLoopTime::addSample(
655     std::chrono::microseconds idle,
656     std::chrono::microseconds busy) {
657   /*
658    * Position at which the busy sample is considered to be taken.
659    * (Allows to quickly skew our average without editing much code)
660    */
661   enum BusySamplePosition {
662     RIGHT = 0, // busy sample placed at the end of the iteration
663     CENTER = 1, // busy sample placed at the middle point of the iteration
664     LEFT = 2, // busy sample placed at the beginning of the iteration
665   };
666
667   // See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
668   // and D676020 for more info on this calculation.
669   VLOG(11) << "idle " << idle.count() << " oldBusyLeftover_ "
670            << oldBusyLeftover_.count() << " idle + oldBusyLeftover_ "
671            << (idle + oldBusyLeftover_).count() << " busy " << busy.count()
672            << " " << __PRETTY_FUNCTION__;
673   idle += oldBusyLeftover_ + busy;
674   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
675   idle -= oldBusyLeftover_;
676
677   double coeff = exp(idle.count() * expCoeff_);
678   value_ *= coeff;
679   value_ += (1.0 - coeff) * busy.count();
680 }
681
682 bool EventBase::nothingHandledYet() const noexcept {
683   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
684   return (nextLoopCnt_ != latestLoopCnt_);
685 }
686
687 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
688                                       InternalEnum internal) {
689
690   struct event* ev = obj->getEvent();
691   assert(ev->ev_base == nullptr);
692
693   event_base_set(getLibeventBase(), ev);
694   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
695     // Set the EVLIST_INTERNAL flag
696     event_ref_flags(ev) |= EVLIST_INTERNAL;
697   }
698 }
699
700 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
701   cancelTimeout(obj);
702   struct event* ev = obj->getEvent();
703   ev->ev_base = nullptr;
704 }
705
706 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
707                                  TimeoutManager::timeout_type timeout) {
708   assert(isInEventBaseThread());
709   // Set up the timeval and add the event
710   struct timeval tv;
711   tv.tv_sec = long(timeout.count() / 1000LL);
712   tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
713
714   struct event* ev = obj->getEvent();
715   if (event_add(ev, &tv) < 0) {
716     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
717     return false;
718   }
719
720   return true;
721 }
722
723 void EventBase::cancelTimeout(AsyncTimeout* obj) {
724   assert(isInEventBaseThread());
725   struct event* ev = obj->getEvent();
726   if (EventUtil::isEventRegistered(ev)) {
727     event_del(ev);
728   }
729 }
730
731 void EventBase::setName(const std::string& name) {
732   assert(isInEventBaseThread());
733   name_ = name;
734
735   if (isRunning()) {
736     setThreadName(loopThread_.load(std::memory_order_relaxed),
737                   name_);
738   }
739 }
740
741 const std::string& EventBase::getName() {
742   assert(isInEventBaseThread());
743   return name_;
744 }
745
746 const char* EventBase::getLibeventVersion() { return event_get_version(); }
747 const char* EventBase::getLibeventMethod() { return event_get_method(); }
748
749 VirtualEventBase& EventBase::getVirtualEventBase() {
750   folly::call_once(virtualEventBaseInitFlag_, [&] {
751     virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
752   });
753
754   return *virtualEventBase_;
755 }
756
757 } // folly