d0d58def06e20e4d5411155b9bff918680612c4d
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/VirtualEventBase.h>
23
24 #include <folly/Memory.h>
25 #include <folly/ThreadName.h>
26 #include <folly/io/async/NotificationQueue.h>
27 #include <folly/portability/Unistd.h>
28
29 #include <condition_variable>
30 #include <fcntl.h>
31 #include <mutex>
32 #include <thread>
33
34 namespace folly {
35
36 /*
37  * EventBase::FunctionRunner
38  */
39
40 class EventBase::FunctionRunner
41     : public NotificationQueue<EventBase::Func>::Consumer {
42  public:
43   void messageAvailable(Func&& msg) noexcept override {
44     // In libevent2, internal events do not break the loop.
45     // Most users would expect loop(), followed by runInEventBaseThread(),
46     // to break the loop and check if it should exit or not.
47     // To have similar bejaviour to libevent1.4, tell the loop to break here.
48     // Note that loop() may still continue to loop, but it will also check the
49     // stop_ flag as well as runInLoop callbacks, etc.
50     event_base_loopbreak(getEventBase()->evb_);
51
52     if (!msg) {
53       // terminateLoopSoon() sends a null message just to
54       // wake up the loop.  We can ignore these messages.
55       return;
56     }
57     msg();
58   }
59 };
60
61 // The interface used to libevent is not thread-safe.  Calls to
62 // event_init() and event_base_free() directly modify an internal
63 // global 'current_base', so a mutex is required to protect this.
64 //
65 // event_init() should only ever be called once.  Subsequent calls
66 // should be made to event_base_new().  We can recognise that
67 // event_init() has already been called by simply inspecting current_base.
68 static std::mutex libevent_mutex_;
69
70 /*
71  * EventBase methods
72  */
73
74 EventBase::EventBase(bool enableTimeMeasurement)
75   : runOnceCallbacks_(nullptr)
76   , stop_(false)
77   , loopThread_()
78   , queue_(nullptr)
79   , fnRunner_(nullptr)
80   , maxLatency_(0)
81   , avgLoopTime_(std::chrono::seconds(2))
82   , maxLatencyLoopTime_(avgLoopTime_)
83   , enableTimeMeasurement_(enableTimeMeasurement)
84   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
85   , latestLoopCnt_(nextLoopCnt_)
86   , startWork_()
87   , observer_(nullptr)
88   , observerSampleCount_(0)
89   , executionObserver_(nullptr) {
90   struct event ev;
91   {
92     std::lock_guard<std::mutex> lock(libevent_mutex_);
93
94     // The value 'current_base' (libevent 1) or
95     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
96     // allowing examination of its value without an explicit reference here.
97     // If ev.ev_base is NULL, then event_init() must be called, otherwise
98     // call event_base_new().
99     event_set(&ev, 0, 0, nullptr, nullptr);
100     if (!ev.ev_base) {
101       evb_ = event_init();
102     }
103   }
104
105   if (ev.ev_base) {
106     evb_ = event_base_new();
107   }
108
109   if (UNLIKELY(evb_ == nullptr)) {
110     LOG(ERROR) << "EventBase(): Failed to init event base.";
111     folly::throwSystemError("error in EventBase::EventBase()");
112   }
113   VLOG(5) << "EventBase(): Created.";
114   initNotificationQueue();
115   RequestContext::saveContext();
116 }
117
118 // takes ownership of the event_base
119 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
120   : runOnceCallbacks_(nullptr)
121   , stop_(false)
122   , loopThread_()
123   , evb_(evb)
124   , queue_(nullptr)
125   , fnRunner_(nullptr)
126   , maxLatency_(0)
127   , avgLoopTime_(std::chrono::seconds(2))
128   , maxLatencyLoopTime_(avgLoopTime_)
129   , enableTimeMeasurement_(enableTimeMeasurement)
130   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
131   , latestLoopCnt_(nextLoopCnt_)
132   , startWork_()
133   , observer_(nullptr)
134   , observerSampleCount_(0)
135   , executionObserver_(nullptr) {
136   if (UNLIKELY(evb_ == nullptr)) {
137     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
138     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
139   }
140   initNotificationQueue();
141   RequestContext::saveContext();
142 }
143
144 EventBase::~EventBase() {
145   std::future<void> virtualEventBaseDestroyFuture;
146   if (virtualEventBase_) {
147     virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
148   }
149
150   // Keep looping until all keep-alive handles are released. Each keep-alive
151   // handle signals that some external code will still schedule some work on
152   // this EventBase (so it's not safe to destroy it).
153   while (loopKeepAliveCount() > 0) {
154     applyLoopKeepAlive();
155     loopOnce();
156   }
157
158   if (virtualEventBaseDestroyFuture.valid()) {
159     virtualEventBaseDestroyFuture.get();
160   }
161
162   // Call all destruction callbacks, before we start cleaning up our state.
163   while (!onDestructionCallbacks_.empty()) {
164     LoopCallback* callback = &onDestructionCallbacks_.front();
165     onDestructionCallbacks_.pop_front();
166     callback->runLoopCallback();
167   }
168
169   clearCobTimeouts();
170
171   DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
172
173   (void)runLoopCallbacks();
174
175   if (!fnRunner_->consumeUntilDrained()) {
176     LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
177   }
178
179   // Stop consumer before deleting NotificationQueue
180   fnRunner_->stopConsuming();
181   {
182     std::lock_guard<std::mutex> lock(libevent_mutex_);
183     event_base_free(evb_);
184   }
185
186   for (auto storage : localStorageToDtor_) {
187     storage->onEventBaseDestruction(*this);
188   }
189
190   VLOG(5) << "EventBase(): Destroyed.";
191 }
192
193 size_t EventBase::getNotificationQueueSize() const {
194   return queue_->size();
195 }
196
197 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
198   fnRunner_->setMaxReadAtOnce(maxAtOnce);
199 }
200
201 // Set smoothing coefficient for loop load average; input is # of milliseconds
202 // for exp(-1) decay.
203 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
204   assert(enableTimeMeasurement_);
205   std::chrono::microseconds us = std::chrono::milliseconds(ms);
206   if (ms > std::chrono::milliseconds::zero()) {
207     maxLatencyLoopTime_.setTimeInterval(us);
208     avgLoopTime_.setTimeInterval(us);
209   } else {
210     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
211   }
212 }
213
214 void EventBase::resetLoadAvg(double value) {
215   assert(enableTimeMeasurement_);
216   avgLoopTime_.reset(value);
217   maxLatencyLoopTime_.reset(value);
218 }
219
220 static std::chrono::milliseconds
221 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
222   auto result = std::chrono::steady_clock::now() - *prev;
223   *prev = std::chrono::steady_clock::now();
224
225   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
226 }
227
228 void EventBase::waitUntilRunning() {
229   while (!isRunning()) {
230     std::this_thread::yield();
231   }
232 }
233
234 // enters the event_base loop -- will only exit when forced to
235 bool EventBase::loop() {
236   return loopBody();
237 }
238
239 bool EventBase::loopOnce(int flags) {
240   return loopBody(flags | EVLOOP_ONCE);
241 }
242
243 bool EventBase::loopBody(int flags) {
244   VLOG(5) << "EventBase(): Starting loop.";
245
246   DCHECK(!invokingLoop_)
247       << "Your code just tried to loop over an event base from inside another "
248       << "event base loop. Since libevent is not reentrant, this leads to "
249       << "undefined behavior in opt builds. Please fix immediately. For the "
250       << "common case of an inner function that needs to do some synchronous "
251       << "computation on an event-base, replace getEventBase() by a new, "
252       << "stack-allocated EvenBase.";
253   invokingLoop_ = true;
254   SCOPE_EXIT {
255     invokingLoop_ = false;
256   };
257
258   int res = 0;
259   bool ranLoopCallbacks;
260   bool blocking = !(flags & EVLOOP_NONBLOCK);
261   bool once = (flags & EVLOOP_ONCE);
262
263   // time-measurement variables.
264   std::chrono::steady_clock::time_point prev;
265   std::chrono::steady_clock::time_point idleStart = {};
266   std::chrono::microseconds busy;
267   std::chrono::microseconds idle;
268
269   loopThread_.store(std::this_thread::get_id(), std::memory_order_release);
270
271   if (!name_.empty()) {
272     setThreadName(name_);
273   }
274
275   if (enableTimeMeasurement_) {
276     prev = std::chrono::steady_clock::now();
277     idleStart = std::chrono::steady_clock::now();
278   }
279
280   while (!stop_.load(std::memory_order_acquire)) {
281     applyLoopKeepAlive();
282     ++nextLoopCnt_;
283
284     // Run the before loop callbacks
285     LoopCallbackList callbacks;
286     callbacks.swap(runBeforeLoopCallbacks_);
287
288     while(!callbacks.empty()) {
289       auto* item = &callbacks.front();
290       callbacks.pop_front();
291       item->runLoopCallback();
292     }
293
294     // nobody can add loop callbacks from within this thread if
295     // we don't have to handle anything to start with...
296     if (blocking && loopCallbacks_.empty()) {
297       res = event_base_loop(evb_, EVLOOP_ONCE);
298     } else {
299       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
300     }
301
302     ranLoopCallbacks = runLoopCallbacks();
303
304     if (enableTimeMeasurement_) {
305       busy = std::chrono::duration_cast<std::chrono::microseconds>(
306           std::chrono::steady_clock::now() - startWork_);
307       idle = std::chrono::duration_cast<std::chrono::microseconds>(
308           startWork_ - idleStart);
309
310       avgLoopTime_.addSample(std::chrono::microseconds(idle),
311         std::chrono::microseconds(busy));
312       maxLatencyLoopTime_.addSample(std::chrono::microseconds(idle),
313         std::chrono::microseconds(busy));
314
315       if (observer_) {
316         if (observerSampleCount_++ == observer_->getSampleRate()) {
317           observerSampleCount_ = 0;
318           observer_->loopSample(busy.count(), idle.count());
319         }
320       }
321
322       VLOG(11) << "EventBase "  << this         << " did not timeout " <<
323         " loop time guess: "    << (busy + idle).count()  <<
324         " idle time: "          << idle.count()         <<
325         " busy time: "          << busy.count()         <<
326         " avgLoopTime: "        << avgLoopTime_.get() <<
327         " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
328         " maxLatency_: "        << maxLatency_.count() << "us" <<
329         " notificationQueueSize: " << getNotificationQueueSize() <<
330         " nothingHandledYet(): " << nothingHandledYet();
331
332       // see if our average loop time has exceeded our limit
333       if ((maxLatency_ > std::chrono::microseconds::zero()) &&
334           (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
335         maxLatencyCob_();
336         // back off temporarily -- don't keep spamming maxLatencyCob_
337         // if we're only a bit over the limit
338         maxLatencyLoopTime_.dampen(0.9);
339       }
340
341       // Our loop run did real work; reset the idle timer
342       idleStart = std::chrono::steady_clock::now();
343     } else {
344       VLOG(11) << "EventBase " << this << " did not timeout";
345     }
346
347     // If the event loop indicate that there were no more events, and
348     // we also didn't have any loop callbacks to run, there is nothing left to
349     // do.
350     if (res != 0 && !ranLoopCallbacks) {
351       // Since Notification Queue is marked 'internal' some events may not have
352       // run.  Run them manually if so, and continue looping.
353       //
354       if (getNotificationQueueSize() > 0) {
355         fnRunner_->handlerReady(0);
356       } else {
357         break;
358       }
359     }
360
361     if (enableTimeMeasurement_) {
362       VLOG(11) << "EventBase " << this << " loop time: " <<
363         getTimeDelta(&prev).count();
364     }
365
366     if (once) {
367       break;
368     }
369   }
370   // Reset stop_ so loop() can be called again
371   stop_ = false;
372
373   if (res < 0) {
374     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
375     return false;
376   } else if (res == 1) {
377     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
378   } else if (res > 1) {
379     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
380     return false;
381   }
382
383   loopThread_.store({}, std::memory_order_release);
384
385   VLOG(5) << "EventBase(): Done with loop.";
386   return true;
387 }
388
389 ssize_t EventBase::loopKeepAliveCount() {
390   if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
391     loopKeepAliveCount_ +=
392         loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
393   }
394   DCHECK_GE(loopKeepAliveCount_, 0);
395
396   return loopKeepAliveCount_;
397 }
398
399 void EventBase::applyLoopKeepAlive() {
400   auto keepAliveCount = loopKeepAliveCount();
401   // Make sure default VirtualEventBase won't hold EventBase::loop() forever.
402   if (virtualEventBase_ && virtualEventBase_->keepAliveCount() == 1) {
403     --keepAliveCount;
404   }
405
406   if (loopKeepAliveActive_ && keepAliveCount == 0) {
407     // Restore the notification queue internal flag
408     fnRunner_->stopConsuming();
409     fnRunner_->startConsumingInternal(this, queue_.get());
410     loopKeepAliveActive_ = false;
411   } else if (!loopKeepAliveActive_ && keepAliveCount > 0) {
412     // Update the notification queue event to treat it as a normal
413     // (non-internal) event.  The notification queue event always remains
414     // installed, and the main loop won't exit with it installed.
415     fnRunner_->stopConsuming();
416     fnRunner_->startConsuming(this, queue_.get());
417     loopKeepAliveActive_ = true;
418   }
419 }
420
421 void EventBase::loopForever() {
422   bool ret;
423   {
424     SCOPE_EXIT {
425       applyLoopKeepAlive();
426     };
427     // Make sure notification queue events are treated as normal events.
428     // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
429     // released inside a loop.
430     ++loopKeepAliveCount_;
431     SCOPE_EXIT {
432       --loopKeepAliveCount_;
433     };
434     ret = loop();
435   }
436
437   if (!ret) {
438     folly::throwSystemError("error in EventBase::loopForever()");
439   }
440 }
441
442 void EventBase::bumpHandlingTime() {
443   if (!enableTimeMeasurement_) {
444     return;
445   }
446
447   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
448     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
449   if (nothingHandledYet()) {
450     latestLoopCnt_ = nextLoopCnt_;
451     // set the time
452     startWork_ = std::chrono::steady_clock::now();
453
454     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
455              << " (loop) startWork_ " << startWork_.time_since_epoch().count();
456   }
457 }
458
459 void EventBase::terminateLoopSoon() {
460   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
461
462   // Set stop to true, so the event loop will know to exit.
463   // TODO: We should really use an atomic operation here with a release
464   // barrier.
465   stop_ = true;
466
467   // Call event_base_loopbreak() so that libevent will exit the next time
468   // around the loop.
469   event_base_loopbreak(evb_);
470
471   // If terminateLoopSoon() is called from another thread,
472   // the EventBase thread might be stuck waiting for events.
473   // In this case, it won't wake up and notice that stop_ is set until it
474   // receives another event.  Send an empty frame to the notification queue
475   // so that the event loop will wake up even if there are no other events.
476   //
477   // We don't care about the return value of trySendFrame().  If it fails
478   // this likely means the EventBase already has lots of events waiting
479   // anyway.
480   try {
481     queue_->putMessage(nullptr);
482   } catch (...) {
483     // We don't care if putMessage() fails.  This likely means
484     // the EventBase already has lots of events waiting anyway.
485   }
486 }
487
488 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
489   DCHECK(isInEventBaseThread());
490   callback->cancelLoopCallback();
491   callback->context_ = RequestContext::saveContext();
492   if (runOnceCallbacks_ != nullptr && thisIteration) {
493     runOnceCallbacks_->push_back(*callback);
494   } else {
495     loopCallbacks_.push_back(*callback);
496   }
497 }
498
499 void EventBase::runInLoop(Func cob, bool thisIteration) {
500   DCHECK(isInEventBaseThread());
501   auto wrapper = new FunctionLoopCallback(std::move(cob));
502   wrapper->context_ = RequestContext::saveContext();
503   if (runOnceCallbacks_ != nullptr && thisIteration) {
504     runOnceCallbacks_->push_back(*wrapper);
505   } else {
506     loopCallbacks_.push_back(*wrapper);
507   }
508 }
509
510 void EventBase::runOnDestruction(LoopCallback* callback) {
511   std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
512   callback->cancelLoopCallback();
513   onDestructionCallbacks_.push_back(*callback);
514 }
515
516 void EventBase::runBeforeLoop(LoopCallback* callback) {
517   DCHECK(isInEventBaseThread());
518   callback->cancelLoopCallback();
519   runBeforeLoopCallbacks_.push_back(*callback);
520 }
521
522 bool EventBase::runInEventBaseThread(Func fn) {
523   // Send the message.
524   // It will be received by the FunctionRunner in the EventBase's thread.
525
526   // We try not to schedule nullptr callbacks
527   if (!fn) {
528     LOG(ERROR) << "EventBase " << this
529                << ": Scheduling nullptr callbacks is not allowed";
530     return false;
531   }
532
533   // Short-circuit if we are already in our event base
534   if (inRunningEventBaseThread()) {
535     runInLoop(std::move(fn));
536     return true;
537
538   }
539
540   try {
541     queue_->putMessage(std::move(fn));
542   } catch (const std::exception& ex) {
543     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
544                << "for EventBase thread: " << ex.what();
545     return false;
546   }
547
548   return true;
549 }
550
551 bool EventBase::runInEventBaseThreadAndWait(FuncRef fn) {
552   if (inRunningEventBaseThread()) {
553     LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
554                << "allowed";
555     return false;
556   }
557
558   bool ready = false;
559   std::mutex m;
560   std::condition_variable cv;
561   runInEventBaseThread([&] {
562       SCOPE_EXIT {
563         std::unique_lock<std::mutex> l(m);
564         ready = true;
565         cv.notify_one();
566         // We cannot release the lock before notify_one, because a spurious
567         // wakeup in the waiting thread may lead to cv and m going out of scope
568         // prematurely.
569       };
570       fn();
571   });
572   std::unique_lock<std::mutex> l(m);
573   cv.wait(l, [&] { return ready; });
574
575   return true;
576 }
577
578 bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn) {
579   if (isInEventBaseThread()) {
580     fn();
581     return true;
582   } else {
583     return runInEventBaseThreadAndWait(std::move(fn));
584   }
585 }
586
587 bool EventBase::runLoopCallbacks() {
588   if (!loopCallbacks_.empty()) {
589     bumpHandlingTime();
590     // Swap the loopCallbacks_ list with a temporary list on our stack.
591     // This way we will only run callbacks scheduled at the time
592     // runLoopCallbacks() was invoked.
593     //
594     // If any of these callbacks in turn call runInLoop() to schedule more
595     // callbacks, those new callbacks won't be run until the next iteration
596     // around the event loop.  This prevents runInLoop() callbacks from being
597     // able to start file descriptor and timeout based events.
598     LoopCallbackList currentCallbacks;
599     currentCallbacks.swap(loopCallbacks_);
600     runOnceCallbacks_ = &currentCallbacks;
601
602     while (!currentCallbacks.empty()) {
603       LoopCallback* callback = &currentCallbacks.front();
604       currentCallbacks.pop_front();
605       folly::RequestContextScopeGuard rctx(callback->context_);
606       callback->runLoopCallback();
607     }
608
609     runOnceCallbacks_ = nullptr;
610     return true;
611   }
612   return false;
613 }
614
615 void EventBase::initNotificationQueue() {
616   // Infinite size queue
617   queue_.reset(new NotificationQueue<Func>());
618
619   // We allocate fnRunner_ separately, rather than declaring it directly
620   // as a member of EventBase solely so that we don't need to include
621   // NotificationQueue.h from EventBase.h
622   fnRunner_.reset(new FunctionRunner());
623
624   // Mark this as an internal event, so event_base_loop() will return if
625   // there are no other events besides this one installed.
626   //
627   // Most callers don't care about the internal notification queue used by
628   // EventBase.  The queue is always installed, so if we did count the queue as
629   // an active event, loop() would never exit with no more events to process.
630   // Users can use loopForever() if they do care about the notification queue.
631   // (This is useful for EventBase threads that do nothing but process
632   // runInEventBaseThread() notifications.)
633   fnRunner_->startConsumingInternal(this, queue_.get());
634 }
635
636 void EventBase::SmoothLoopTime::setTimeInterval(
637     std::chrono::microseconds timeInterval) {
638   expCoeff_ = -1.0 / timeInterval.count();
639   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
640 }
641
642 void EventBase::SmoothLoopTime::reset(double value) {
643   value_ = value;
644 }
645
646 void EventBase::SmoothLoopTime::addSample(
647     std::chrono::microseconds idle,
648     std::chrono::microseconds busy) {
649   /*
650    * Position at which the busy sample is considered to be taken.
651    * (Allows to quickly skew our average without editing much code)
652    */
653   enum BusySamplePosition {
654     RIGHT = 0, // busy sample placed at the end of the iteration
655     CENTER = 1, // busy sample placed at the middle point of the iteration
656     LEFT = 2, // busy sample placed at the beginning of the iteration
657   };
658
659   // See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
660   // and D676020 for more info on this calculation.
661   VLOG(11) << "idle " << idle.count() << " oldBusyLeftover_ "
662            << oldBusyLeftover_.count() << " idle + oldBusyLeftover_ "
663            << (idle + oldBusyLeftover_).count() << " busy " << busy.count()
664            << " " << __PRETTY_FUNCTION__;
665   idle += oldBusyLeftover_ + busy;
666   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
667   idle -= oldBusyLeftover_;
668
669   double coeff = exp(idle.count() * expCoeff_);
670   value_ *= coeff;
671   value_ += (1.0 - coeff) * busy.count();
672 }
673
674 bool EventBase::nothingHandledYet() const noexcept {
675   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
676   return (nextLoopCnt_ != latestLoopCnt_);
677 }
678
679 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
680                                       InternalEnum internal) {
681
682   struct event* ev = obj->getEvent();
683   assert(ev->ev_base == nullptr);
684
685   event_base_set(getLibeventBase(), ev);
686   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
687     // Set the EVLIST_INTERNAL flag
688     event_ref_flags(ev) |= EVLIST_INTERNAL;
689   }
690 }
691
692 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
693   cancelTimeout(obj);
694   struct event* ev = obj->getEvent();
695   ev->ev_base = nullptr;
696 }
697
698 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
699                                  TimeoutManager::timeout_type timeout) {
700   assert(isInEventBaseThread());
701   // Set up the timeval and add the event
702   struct timeval tv;
703   tv.tv_sec = long(timeout.count() / 1000LL);
704   tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
705
706   struct event* ev = obj->getEvent();
707   if (event_add(ev, &tv) < 0) {
708     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
709     return false;
710   }
711
712   return true;
713 }
714
715 void EventBase::cancelTimeout(AsyncTimeout* obj) {
716   assert(isInEventBaseThread());
717   struct event* ev = obj->getEvent();
718   if (EventUtil::isEventRegistered(ev)) {
719     event_del(ev);
720   }
721 }
722
723 void EventBase::setName(const std::string& name) {
724   assert(isInEventBaseThread());
725   name_ = name;
726
727   if (isRunning()) {
728     setThreadName(loopThread_.load(std::memory_order_relaxed),
729                   name_);
730   }
731 }
732
733 const std::string& EventBase::getName() {
734   assert(isInEventBaseThread());
735   return name_;
736 }
737
738 const char* EventBase::getLibeventVersion() { return event_get_version(); }
739 const char* EventBase::getLibeventMethod() { return event_get_method(); }
740
741 VirtualEventBase& EventBase::getVirtualEventBase() {
742   folly::call_once(virtualEventBaseInitFlag_, [&] {
743     virtualEventBase_ = folly::make_unique<VirtualEventBase>(*this);
744   });
745
746   return *virtualEventBase_;
747 }
748
749 } // folly