Loop Time
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2014-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef __STDC_FORMAT_MACROS
17 #define __STDC_FORMAT_MACROS
18 #endif
19
20 #include <folly/io/async/EventBase.h>
21
22 #include <fcntl.h>
23
24 #include <memory>
25 #include <mutex>
26 #include <thread>
27
28 #include <folly/Memory.h>
29 #include <folly/io/async/NotificationQueue.h>
30 #include <folly/io/async/VirtualEventBase.h>
31 #include <folly/portability/Unistd.h>
32 #include <folly/synchronization/Baton.h>
33 #include <folly/system/ThreadName.h>
34
35 namespace folly {
36
37 /*
38  * EventBase::FunctionRunner
39  */
40
41 class EventBase::FunctionRunner
42     : public NotificationQueue<EventBase::Func>::Consumer {
43  public:
44   void messageAvailable(Func&& msg) noexcept override {
45     // In libevent2, internal events do not break the loop.
46     // Most users would expect loop(), followed by runInEventBaseThread(),
47     // to break the loop and check if it should exit or not.
48     // To have similar bejaviour to libevent1.4, tell the loop to break here.
49     // Note that loop() may still continue to loop, but it will also check the
50     // stop_ flag as well as runInLoop callbacks, etc.
51     event_base_loopbreak(getEventBase()->evb_);
52
53     if (!msg) {
54       // terminateLoopSoon() sends a null message just to
55       // wake up the loop.  We can ignore these messages.
56       return;
57     }
58     msg();
59   }
60 };
61
62 // The interface used to libevent is not thread-safe.  Calls to
63 // event_init() and event_base_free() directly modify an internal
64 // global 'current_base', so a mutex is required to protect this.
65 //
66 // event_init() should only ever be called once.  Subsequent calls
67 // should be made to event_base_new().  We can recognise that
68 // event_init() has already been called by simply inspecting current_base.
69 static std::mutex libevent_mutex_;
70
71 /*
72  * EventBase methods
73  */
74
75 EventBase::EventBase(bool enableTimeMeasurement)
76   : runOnceCallbacks_(nullptr)
77   , stop_(false)
78   , loopThread_()
79   , queue_(nullptr)
80   , fnRunner_(nullptr)
81   , maxLatency_(0)
82   , avgLoopTime_(std::chrono::seconds(2))
83   , maxLatencyLoopTime_(avgLoopTime_)
84   , enableTimeMeasurement_(enableTimeMeasurement)
85   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
86   , latestLoopCnt_(nextLoopCnt_)
87   , startWork_()
88   , observer_(nullptr)
89   , observerSampleCount_(0)
90   , executionObserver_(nullptr) {
91   struct event ev;
92   {
93     std::lock_guard<std::mutex> lock(libevent_mutex_);
94
95     // The value 'current_base' (libevent 1) or
96     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
97     // allowing examination of its value without an explicit reference here.
98     // If ev.ev_base is nullptr, then event_init() must be called, otherwise
99     // call event_base_new().
100     event_set(&ev, 0, 0, nullptr, nullptr);
101     if (!ev.ev_base) {
102       evb_ = event_init();
103     }
104   }
105
106   if (ev.ev_base) {
107     evb_ = event_base_new();
108   }
109
110   if (UNLIKELY(evb_ == nullptr)) {
111     LOG(ERROR) << "EventBase(): Failed to init event base.";
112     folly::throwSystemError("error in EventBase::EventBase()");
113   }
114   VLOG(5) << "EventBase(): Created.";
115   initNotificationQueue();
116 }
117
118 // takes ownership of the event_base
119 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
120   : runOnceCallbacks_(nullptr)
121   , stop_(false)
122   , loopThread_()
123   , evb_(evb)
124   , queue_(nullptr)
125   , fnRunner_(nullptr)
126   , maxLatency_(0)
127   , avgLoopTime_(std::chrono::seconds(2))
128   , maxLatencyLoopTime_(avgLoopTime_)
129   , enableTimeMeasurement_(enableTimeMeasurement)
130   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
131   , latestLoopCnt_(nextLoopCnt_)
132   , startWork_()
133   , observer_(nullptr)
134   , observerSampleCount_(0)
135   , executionObserver_(nullptr) {
136   if (UNLIKELY(evb_ == nullptr)) {
137     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
138     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
139   }
140   initNotificationQueue();
141 }
142
143 EventBase::~EventBase() {
144   std::future<void> virtualEventBaseDestroyFuture;
145   if (virtualEventBase_) {
146     virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
147   }
148
149   // Keep looping until all keep-alive handles are released. Each keep-alive
150   // handle signals that some external code will still schedule some work on
151   // this EventBase (so it's not safe to destroy it).
152   while (loopKeepAliveCount() > 0) {
153     applyLoopKeepAlive();
154     loopOnce();
155   }
156
157   if (virtualEventBaseDestroyFuture.valid()) {
158     virtualEventBaseDestroyFuture.get();
159   }
160
161   // Call all destruction callbacks, before we start cleaning up our state.
162   while (!onDestructionCallbacks_.empty()) {
163     LoopCallback* callback = &onDestructionCallbacks_.front();
164     onDestructionCallbacks_.pop_front();
165     callback->runLoopCallback();
166   }
167
168   clearCobTimeouts();
169
170   DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
171
172   (void)runLoopCallbacks();
173
174   if (!fnRunner_->consumeUntilDrained()) {
175     LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
176   }
177
178   // Stop consumer before deleting NotificationQueue
179   fnRunner_->stopConsuming();
180   {
181     std::lock_guard<std::mutex> lock(libevent_mutex_);
182     event_base_free(evb_);
183   }
184
185   for (auto storage : localStorageToDtor_) {
186     storage->onEventBaseDestruction(*this);
187   }
188
189   VLOG(5) << "EventBase(): Destroyed.";
190 }
191
192 size_t EventBase::getNotificationQueueSize() const {
193   return queue_->size();
194 }
195
196 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
197   fnRunner_->setMaxReadAtOnce(maxAtOnce);
198 }
199
200 void EventBase::checkIsInEventBaseThread() const {
201   auto evbTid = loopThread_.load(std::memory_order_relaxed);
202   if (evbTid == std::thread::id()) {
203     return;
204   }
205
206   // Using getThreadName(evbTid) instead of name_ will work also if
207   // the thread name is set outside of EventBase (and name_ is empty).
208   auto curTid = std::this_thread::get_id();
209   CHECK(evbTid == curTid)
210       << "This logic must be executed in the event base thread. "
211       << "Event base thread name: \""
212       << folly::getThreadName(evbTid).value_or("")
213       << "\", current thread name: \""
214       << folly::getThreadName(curTid).value_or("") << "\"";
215 }
216
217 // Set smoothing coefficient for loop load average; input is # of milliseconds
218 // for exp(-1) decay.
219 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
220   assert(enableTimeMeasurement_);
221   std::chrono::microseconds us = std::chrono::milliseconds(ms);
222   if (ms > std::chrono::milliseconds::zero()) {
223     maxLatencyLoopTime_.setTimeInterval(us);
224     avgLoopTime_.setTimeInterval(us);
225   } else {
226     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
227   }
228 }
229
230 void EventBase::resetLoadAvg(double value) {
231   assert(enableTimeMeasurement_);
232   avgLoopTime_.reset(value);
233   maxLatencyLoopTime_.reset(value);
234 }
235
236 static std::chrono::milliseconds
237 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
238   auto result = std::chrono::steady_clock::now() - *prev;
239   *prev = std::chrono::steady_clock::now();
240
241   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
242 }
243
244 void EventBase::waitUntilRunning() {
245   while (!isRunning()) {
246     std::this_thread::yield();
247   }
248 }
249
250 // enters the event_base loop -- will only exit when forced to
251 bool EventBase::loop() {
252   return loopBody();
253 }
254
255 bool EventBase::loopOnce(int flags) {
256   return loopBody(flags | EVLOOP_ONCE);
257 }
258
259 bool EventBase::loopBody(int flags) {
260   VLOG(5) << "EventBase(): Starting loop.";
261
262   DCHECK(!invokingLoop_)
263       << "Your code just tried to loop over an event base from inside another "
264       << "event base loop. Since libevent is not reentrant, this leads to "
265       << "undefined behavior in opt builds. Please fix immediately. For the "
266       << "common case of an inner function that needs to do some synchronous "
267       << "computation on an event-base, replace getEventBase() by a new, "
268       << "stack-allocated EvenBase.";
269   invokingLoop_ = true;
270   SCOPE_EXIT {
271     invokingLoop_ = false;
272   };
273
274   int res = 0;
275   bool ranLoopCallbacks;
276   bool blocking = !(flags & EVLOOP_NONBLOCK);
277   bool once = (flags & EVLOOP_ONCE);
278
279   // time-measurement variables.
280   std::chrono::steady_clock::time_point prev;
281   std::chrono::steady_clock::time_point idleStart = {};
282   std::chrono::microseconds busy;
283   std::chrono::microseconds idle;
284
285   loopThread_.store(std::this_thread::get_id(), std::memory_order_release);
286
287   if (!name_.empty()) {
288     setThreadName(name_);
289   }
290
291   if (enableTimeMeasurement_) {
292     prev = std::chrono::steady_clock::now();
293     idleStart = prev;
294   }
295
296   while (!stop_.load(std::memory_order_relaxed)) {
297     applyLoopKeepAlive();
298     ++nextLoopCnt_;
299
300     // Run the before loop callbacks
301     LoopCallbackList callbacks;
302     callbacks.swap(runBeforeLoopCallbacks_);
303
304     while(!callbacks.empty()) {
305       auto* item = &callbacks.front();
306       callbacks.pop_front();
307       item->runLoopCallback();
308     }
309
310     // nobody can add loop callbacks from within this thread if
311     // we don't have to handle anything to start with...
312     if (blocking && loopCallbacks_.empty()) {
313       res = event_base_loop(evb_, EVLOOP_ONCE);
314     } else {
315       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
316     }
317
318     ranLoopCallbacks = runLoopCallbacks();
319
320     if (enableTimeMeasurement_) {
321       auto now = std::chrono::steady_clock::now();
322       busy = std::chrono::duration_cast<std::chrono::microseconds>(
323           now - startWork_);
324       idle = std::chrono::duration_cast<std::chrono::microseconds>(
325           startWork_ - idleStart);
326       auto loop_time = busy + idle;
327
328       avgLoopTime_.addSample(loop_time, busy);
329       maxLatencyLoopTime_.addSample(loop_time, busy);
330
331       if (observer_) {
332         if (observerSampleCount_++ == observer_->getSampleRate()) {
333           observerSampleCount_ = 0;
334           observer_->loopSample(busy.count(), idle.count());
335         }
336       }
337
338       VLOG(11) << "EventBase " << this << " did not timeout "
339                << " loop time guess: " << loop_time.count()
340                << " idle time: " << idle.count()
341                << " busy time: " << busy.count()
342                << " avgLoopTime: " << avgLoopTime_.get()
343                << " maxLatencyLoopTime: " << maxLatencyLoopTime_.get()
344                << " maxLatency_: " << maxLatency_.count() << "us"
345                << " notificationQueueSize: " << getNotificationQueueSize()
346                << " nothingHandledYet(): " << nothingHandledYet();
347
348       // see if our average loop time has exceeded our limit
349       if ((maxLatency_ > std::chrono::microseconds::zero()) &&
350           (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
351         maxLatencyCob_();
352         // back off temporarily -- don't keep spamming maxLatencyCob_
353         // if we're only a bit over the limit
354         maxLatencyLoopTime_.dampen(0.9);
355       }
356
357       // Our loop run did real work; reset the idle timer
358       idleStart = now;
359     } else {
360       VLOG(11) << "EventBase " << this << " did not timeout";
361     }
362
363     // If the event loop indicate that there were no more events, and
364     // we also didn't have any loop callbacks to run, there is nothing left to
365     // do.
366     if (res != 0 && !ranLoopCallbacks) {
367       // Since Notification Queue is marked 'internal' some events may not have
368       // run.  Run them manually if so, and continue looping.
369       //
370       if (getNotificationQueueSize() > 0) {
371         fnRunner_->handlerReady(0);
372       } else {
373         break;
374       }
375     }
376
377     if (enableTimeMeasurement_) {
378       VLOG(11) << "EventBase " << this << " loop time: " <<
379         getTimeDelta(&prev).count();
380     }
381
382     if (once) {
383       break;
384     }
385   }
386   // Reset stop_ so loop() can be called again
387   stop_.store(false, std::memory_order_relaxed);
388
389   if (res < 0) {
390     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
391     return false;
392   } else if (res == 1) {
393     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
394   } else if (res > 1) {
395     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
396     return false;
397   }
398
399   loopThread_.store({}, std::memory_order_release);
400
401   VLOG(5) << "EventBase(): Done with loop.";
402   return true;
403 }
404
405 ssize_t EventBase::loopKeepAliveCount() {
406   if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
407     loopKeepAliveCount_ +=
408         loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
409   }
410   DCHECK_GE(loopKeepAliveCount_, 0);
411
412   return loopKeepAliveCount_;
413 }
414
415 void EventBase::applyLoopKeepAlive() {
416   auto keepAliveCount = loopKeepAliveCount();
417   // Make sure default VirtualEventBase won't hold EventBase::loop() forever.
418   if (virtualEventBase_ && virtualEventBase_->keepAliveCount() == 1) {
419     --keepAliveCount;
420   }
421
422   if (loopKeepAliveActive_ && keepAliveCount == 0) {
423     // Restore the notification queue internal flag
424     fnRunner_->stopConsuming();
425     fnRunner_->startConsumingInternal(this, queue_.get());
426     loopKeepAliveActive_ = false;
427   } else if (!loopKeepAliveActive_ && keepAliveCount > 0) {
428     // Update the notification queue event to treat it as a normal
429     // (non-internal) event.  The notification queue event always remains
430     // installed, and the main loop won't exit with it installed.
431     fnRunner_->stopConsuming();
432     fnRunner_->startConsuming(this, queue_.get());
433     loopKeepAliveActive_ = true;
434   }
435 }
436
437 void EventBase::loopForever() {
438   bool ret;
439   {
440     SCOPE_EXIT {
441       applyLoopKeepAlive();
442     };
443     // Make sure notification queue events are treated as normal events.
444     // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
445     // released inside a loop.
446     ++loopKeepAliveCount_;
447     SCOPE_EXIT {
448       --loopKeepAliveCount_;
449     };
450     ret = loop();
451   }
452
453   if (!ret) {
454     folly::throwSystemError("error in EventBase::loopForever()");
455   }
456 }
457
458 void EventBase::bumpHandlingTime() {
459   if (!enableTimeMeasurement_) {
460     return;
461   }
462
463   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
464     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
465   if (nothingHandledYet()) {
466     latestLoopCnt_ = nextLoopCnt_;
467     // set the time
468     startWork_ = std::chrono::steady_clock::now();
469
470     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
471              << " (loop) startWork_ " << startWork_.time_since_epoch().count();
472   }
473 }
474
475 void EventBase::terminateLoopSoon() {
476   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
477
478   // Set stop to true, so the event loop will know to exit.
479   stop_.store(true, std::memory_order_relaxed);
480
481   // Call event_base_loopbreak() so that libevent will exit the next time
482   // around the loop.
483   event_base_loopbreak(evb_);
484
485   // If terminateLoopSoon() is called from another thread,
486   // the EventBase thread might be stuck waiting for events.
487   // In this case, it won't wake up and notice that stop_ is set until it
488   // receives another event.  Send an empty frame to the notification queue
489   // so that the event loop will wake up even if there are no other events.
490   //
491   // We don't care about the return value of trySendFrame().  If it fails
492   // this likely means the EventBase already has lots of events waiting
493   // anyway.
494   try {
495     queue_->putMessage(nullptr);
496   } catch (...) {
497     // We don't care if putMessage() fails.  This likely means
498     // the EventBase already has lots of events waiting anyway.
499   }
500 }
501
502 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
503   dcheckIsInEventBaseThread();
504   callback->cancelLoopCallback();
505   callback->context_ = RequestContext::saveContext();
506   if (runOnceCallbacks_ != nullptr && thisIteration) {
507     runOnceCallbacks_->push_back(*callback);
508   } else {
509     loopCallbacks_.push_back(*callback);
510   }
511 }
512
513 void EventBase::runInLoop(Func cob, bool thisIteration) {
514   dcheckIsInEventBaseThread();
515   auto wrapper = new FunctionLoopCallback(std::move(cob));
516   wrapper->context_ = RequestContext::saveContext();
517   if (runOnceCallbacks_ != nullptr && thisIteration) {
518     runOnceCallbacks_->push_back(*wrapper);
519   } else {
520     loopCallbacks_.push_back(*wrapper);
521   }
522 }
523
524 void EventBase::runOnDestruction(LoopCallback* callback) {
525   std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
526   callback->cancelLoopCallback();
527   onDestructionCallbacks_.push_back(*callback);
528 }
529
530 void EventBase::runBeforeLoop(LoopCallback* callback) {
531   dcheckIsInEventBaseThread();
532   callback->cancelLoopCallback();
533   runBeforeLoopCallbacks_.push_back(*callback);
534 }
535
536 bool EventBase::runInEventBaseThread(Func fn) {
537   // Send the message.
538   // It will be received by the FunctionRunner in the EventBase's thread.
539
540   // We try not to schedule nullptr callbacks
541   if (!fn) {
542     LOG(ERROR) << "EventBase " << this
543                << ": Scheduling nullptr callbacks is not allowed";
544     return false;
545   }
546
547   // Short-circuit if we are already in our event base
548   if (inRunningEventBaseThread()) {
549     runInLoop(std::move(fn));
550     return true;
551   }
552
553   try {
554     queue_->putMessage(std::move(fn));
555   } catch (const std::exception& ex) {
556     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
557                << "for EventBase thread: " << ex.what();
558     return false;
559   }
560
561   return true;
562 }
563
564 bool EventBase::runInEventBaseThreadAndWait(Func fn) {
565   if (inRunningEventBaseThread()) {
566     LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
567                << "allowed";
568     return false;
569   }
570
571   Baton<> ready;
572   runInEventBaseThread([&ready, fn = std::move(fn)]() mutable {
573     SCOPE_EXIT {
574       ready.post();
575     };
576     // A trick to force the stored functor to be executed and then destructed
577     // before posting the baton and waking the waiting thread.
578     copy(std::move(fn))();
579   });
580   ready.wait();
581
582   return true;
583 }
584
585 bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) {
586   if (isInEventBaseThread()) {
587     fn();
588     return true;
589   } else {
590     return runInEventBaseThreadAndWait(std::move(fn));
591   }
592 }
593
594 bool EventBase::runLoopCallbacks() {
595   if (!loopCallbacks_.empty()) {
596     bumpHandlingTime();
597     // Swap the loopCallbacks_ list with a temporary list on our stack.
598     // This way we will only run callbacks scheduled at the time
599     // runLoopCallbacks() was invoked.
600     //
601     // If any of these callbacks in turn call runInLoop() to schedule more
602     // callbacks, those new callbacks won't be run until the next iteration
603     // around the event loop.  This prevents runInLoop() callbacks from being
604     // able to start file descriptor and timeout based events.
605     LoopCallbackList currentCallbacks;
606     currentCallbacks.swap(loopCallbacks_);
607     runOnceCallbacks_ = &currentCallbacks;
608
609     while (!currentCallbacks.empty()) {
610       LoopCallback* callback = &currentCallbacks.front();
611       currentCallbacks.pop_front();
612       folly::RequestContextScopeGuard rctx(std::move(callback->context_));
613       callback->runLoopCallback();
614     }
615
616     runOnceCallbacks_ = nullptr;
617     return true;
618   }
619   return false;
620 }
621
622 void EventBase::initNotificationQueue() {
623   // Infinite size queue
624   queue_ = std::make_unique<NotificationQueue<Func>>();
625
626   // We allocate fnRunner_ separately, rather than declaring it directly
627   // as a member of EventBase solely so that we don't need to include
628   // NotificationQueue.h from EventBase.h
629   fnRunner_ = std::make_unique<FunctionRunner>();
630
631   // Mark this as an internal event, so event_base_loop() will return if
632   // there are no other events besides this one installed.
633   //
634   // Most callers don't care about the internal notification queue used by
635   // EventBase.  The queue is always installed, so if we did count the queue as
636   // an active event, loop() would never exit with no more events to process.
637   // Users can use loopForever() if they do care about the notification queue.
638   // (This is useful for EventBase threads that do nothing but process
639   // runInEventBaseThread() notifications.)
640   fnRunner_->startConsumingInternal(this, queue_.get());
641 }
642
643 void EventBase::SmoothLoopTime::setTimeInterval(
644     std::chrono::microseconds timeInterval) {
645   expCoeff_ = -1.0 / timeInterval.count();
646   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
647 }
648
649 void EventBase::SmoothLoopTime::reset(double value) {
650   value_ = value;
651 }
652
653 void EventBase::SmoothLoopTime::addSample(
654     std::chrono::microseconds total,
655     std::chrono::microseconds busy) {
656   if ((buffer_time_ + total) > buffer_interval_ && buffer_cnt_ > 0) {
657     // See https://en.wikipedia.org/wiki/Exponential_smoothing for
658     // more info on this calculation.
659     double coeff = exp(buffer_time_.count() * expCoeff_);
660     value_ =
661         value_ * coeff + (1.0 - coeff) * (busy_buffer_.count() / buffer_cnt_);
662     buffer_time_ = std::chrono::microseconds{0};
663     busy_buffer_ = std::chrono::microseconds{0};
664     buffer_cnt_ = 0;
665   }
666   buffer_time_ += total;
667   busy_buffer_ += busy;
668   buffer_cnt_++;
669 }
670
671 bool EventBase::nothingHandledYet() const noexcept {
672   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
673   return (nextLoopCnt_ != latestLoopCnt_);
674 }
675
676 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
677                                       InternalEnum internal) {
678
679   struct event* ev = obj->getEvent();
680   assert(ev->ev_base == nullptr);
681
682   event_base_set(getLibeventBase(), ev);
683   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
684     // Set the EVLIST_INTERNAL flag
685     event_ref_flags(ev) |= EVLIST_INTERNAL;
686   }
687 }
688
689 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
690   cancelTimeout(obj);
691   struct event* ev = obj->getEvent();
692   ev->ev_base = nullptr;
693 }
694
695 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
696                                  TimeoutManager::timeout_type timeout) {
697   dcheckIsInEventBaseThread();
698   // Set up the timeval and add the event
699   struct timeval tv;
700   tv.tv_sec = long(timeout.count() / 1000LL);
701   tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
702
703   struct event* ev = obj->getEvent();
704   if (event_add(ev, &tv) < 0) {
705     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
706     return false;
707   }
708
709   return true;
710 }
711
712 void EventBase::cancelTimeout(AsyncTimeout* obj) {
713   dcheckIsInEventBaseThread();
714   struct event* ev = obj->getEvent();
715   if (EventUtil::isEventRegistered(ev)) {
716     event_del(ev);
717   }
718 }
719
720 void EventBase::setName(const std::string& name) {
721   dcheckIsInEventBaseThread();
722   name_ = name;
723
724   if (isRunning()) {
725     setThreadName(loopThread_.load(std::memory_order_relaxed),
726                   name_);
727   }
728 }
729
730 const std::string& EventBase::getName() {
731   dcheckIsInEventBaseThread();
732   return name_;
733 }
734
735 const char* EventBase::getLibeventVersion() { return event_get_version(); }
736 const char* EventBase::getLibeventMethod() { return event_get_method(); }
737
738 VirtualEventBase& EventBase::getVirtualEventBase() {
739   folly::call_once(virtualEventBaseInitFlag_, [&] {
740     virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
741   });
742
743   return *virtualEventBase_;
744 }
745
746 constexpr std::chrono::milliseconds EventBase::SmoothLoopTime::buffer_interval_;
747 } // namespace folly