Make most implicit integer truncations and sign conversions explicit
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/EventBase.h>
22
23 #include <folly/ThreadName.h>
24 #include <folly/io/async/NotificationQueue.h>
25 #include <folly/portability/Unistd.h>
26
27 #include <condition_variable>
28 #include <fcntl.h>
29 #include <mutex>
30
31 namespace folly {
32
33 /*
34  * EventBase::FunctionRunner
35  */
36
37 class EventBase::FunctionRunner
38     : public NotificationQueue<EventBase::Func>::Consumer {
39  public:
40   void messageAvailable(Func&& msg) override {
41     // In libevent2, internal events do not break the loop.
42     // Most users would expect loop(), followed by runInEventBaseThread(),
43     // to break the loop and check if it should exit or not.
44     // To have similar bejaviour to libevent1.4, tell the loop to break here.
45     // Note that loop() may still continue to loop, but it will also check the
46     // stop_ flag as well as runInLoop callbacks, etc.
47     event_base_loopbreak(getEventBase()->evb_);
48
49     if (!msg) {
50       // terminateLoopSoon() sends a null message just to
51       // wake up the loop.  We can ignore these messages.
52       return;
53     }
54
55     // The function should never throw an exception, because we have no
56     // way of knowing what sort of error handling to perform.
57     //
58     // If it does throw, log a message and abort the program.
59     try {
60       msg();
61     } catch (const std::exception& ex) {
62       LOG(ERROR) << "runInEventBaseThread() function threw a "
63                  << typeid(ex).name() << " exception: " << ex.what();
64       abort();
65     } catch (...) {
66       LOG(ERROR) << "runInEventBaseThread() function threw an exception";
67       abort();
68     }
69   }
70 };
71
72 // The interface used to libevent is not thread-safe.  Calls to
73 // event_init() and event_base_free() directly modify an internal
74 // global 'current_base', so a mutex is required to protect this.
75 //
76 // event_init() should only ever be called once.  Subsequent calls
77 // should be made to event_base_new().  We can recognise that
78 // event_init() has already been called by simply inspecting current_base.
79 static std::mutex libevent_mutex_;
80
81 /*
82  * EventBase methods
83  */
84
85 EventBase::EventBase(bool enableTimeMeasurement)
86   : runOnceCallbacks_(nullptr)
87   , stop_(false)
88   , loopThread_()
89   , queue_(nullptr)
90   , fnRunner_(nullptr)
91   , maxLatency_(0)
92   , avgLoopTime_(std::chrono::seconds(2))
93   , maxLatencyLoopTime_(avgLoopTime_)
94   , enableTimeMeasurement_(enableTimeMeasurement)
95   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
96   , latestLoopCnt_(nextLoopCnt_)
97   , startWork_()
98   , observer_(nullptr)
99   , observerSampleCount_(0)
100   , executionObserver_(nullptr) {
101   struct event ev;
102   {
103     std::lock_guard<std::mutex> lock(libevent_mutex_);
104
105     // The value 'current_base' (libevent 1) or
106     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
107     // allowing examination of its value without an explicit reference here.
108     // If ev.ev_base is NULL, then event_init() must be called, otherwise
109     // call event_base_new().
110     event_set(&ev, 0, 0, nullptr, nullptr);
111     if (!ev.ev_base) {
112       evb_ = event_init();
113     }
114   }
115
116   if (ev.ev_base) {
117     evb_ = event_base_new();
118   }
119
120   if (UNLIKELY(evb_ == nullptr)) {
121     LOG(ERROR) << "EventBase(): Failed to init event base.";
122     folly::throwSystemError("error in EventBase::EventBase()");
123   }
124   VLOG(5) << "EventBase(): Created.";
125   initNotificationQueue();
126   RequestContext::saveContext();
127 }
128
129 // takes ownership of the event_base
130 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
131   : runOnceCallbacks_(nullptr)
132   , stop_(false)
133   , loopThread_()
134   , evb_(evb)
135   , queue_(nullptr)
136   , fnRunner_(nullptr)
137   , maxLatency_(0)
138   , avgLoopTime_(std::chrono::seconds(2))
139   , maxLatencyLoopTime_(avgLoopTime_)
140   , enableTimeMeasurement_(enableTimeMeasurement)
141   , nextLoopCnt_(uint64_t(-40)) // Early wrap-around so bugs will manifest soon
142   , latestLoopCnt_(nextLoopCnt_)
143   , startWork_()
144   , observer_(nullptr)
145   , observerSampleCount_(0)
146   , executionObserver_(nullptr) {
147   if (UNLIKELY(evb_ == nullptr)) {
148     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
149     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
150   }
151   initNotificationQueue();
152   RequestContext::saveContext();
153 }
154
155 EventBase::~EventBase() {
156   // Keep looping until all keep-alive handles are released. Each keep-alive
157   // handle signals that some external code will still schedule some work on
158   // this EventBase (so it's not safe to destroy it).
159   while (loopKeepAliveCount() > 0) {
160     applyLoopKeepAlive();
161     loopOnce();
162   }
163
164   // Call all destruction callbacks, before we start cleaning up our state.
165   while (!onDestructionCallbacks_.empty()) {
166     LoopCallback* callback = &onDestructionCallbacks_.front();
167     onDestructionCallbacks_.pop_front();
168     callback->runLoopCallback();
169   }
170
171   clearCobTimeouts();
172
173   DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
174
175   (void)runLoopCallbacks();
176
177   if (!fnRunner_->consumeUntilDrained()) {
178     LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
179   }
180
181   // Stop consumer before deleting NotificationQueue
182   fnRunner_->stopConsuming();
183   {
184     std::lock_guard<std::mutex> lock(libevent_mutex_);
185     event_base_free(evb_);
186   }
187
188   {
189     std::lock_guard<std::mutex> lock(localStorageMutex_);
190     for (auto storage : localStorageToDtor_) {
191       storage->onEventBaseDestruction(*this);
192     }
193   }
194   VLOG(5) << "EventBase(): Destroyed.";
195 }
196
197 size_t EventBase::getNotificationQueueSize() const {
198   return queue_->size();
199 }
200
201 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
202   fnRunner_->setMaxReadAtOnce(maxAtOnce);
203 }
204
205 // Set smoothing coefficient for loop load average; input is # of milliseconds
206 // for exp(-1) decay.
207 void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
208   assert(enableTimeMeasurement_);
209   std::chrono::microseconds us = std::chrono::milliseconds(ms);
210   if (ms > std::chrono::milliseconds::zero()) {
211     maxLatencyLoopTime_.setTimeInterval(us);
212     avgLoopTime_.setTimeInterval(us);
213   } else {
214     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
215   }
216 }
217
218 void EventBase::resetLoadAvg(double value) {
219   assert(enableTimeMeasurement_);
220   avgLoopTime_.reset(value);
221   maxLatencyLoopTime_.reset(value);
222 }
223
224 static std::chrono::milliseconds
225 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
226   auto result = std::chrono::steady_clock::now() - *prev;
227   *prev = std::chrono::steady_clock::now();
228
229   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
230 }
231
232 void EventBase::waitUntilRunning() {
233   while (!isRunning()) {
234     sched_yield();
235   }
236 }
237
238 // enters the event_base loop -- will only exit when forced to
239 bool EventBase::loop() {
240   return loopBody();
241 }
242
243 bool EventBase::loopOnce(int flags) {
244   return loopBody(flags | EVLOOP_ONCE);
245 }
246
247 bool EventBase::loopBody(int flags) {
248   VLOG(5) << "EventBase(): Starting loop.";
249
250   DCHECK(!invokingLoop_)
251       << "Your code just tried to loop over an event base from inside another "
252       << "event base loop. Since libevent is not reentrant, this leads to "
253       << "undefined behavior in opt builds. Please fix immediately. For the "
254       << "common case of an inner function that needs to do some synchronous "
255       << "computation on an event-base, replace getEventBase() by a new, "
256       << "stack-allocated EvenBase.";
257   invokingLoop_ = true;
258   SCOPE_EXIT {
259     invokingLoop_ = false;
260   };
261
262   int res = 0;
263   bool ranLoopCallbacks;
264   bool blocking = !(flags & EVLOOP_NONBLOCK);
265   bool once = (flags & EVLOOP_ONCE);
266
267   // time-measurement variables.
268   std::chrono::steady_clock::time_point prev;
269   std::chrono::steady_clock::time_point idleStart = {};
270   std::chrono::microseconds busy;
271   std::chrono::microseconds idle;
272
273   loopThread_.store(std::this_thread::get_id(), std::memory_order_release);
274
275   if (!name_.empty()) {
276     setThreadName(name_);
277   }
278
279   if (enableTimeMeasurement_) {
280     prev = std::chrono::steady_clock::now();
281     idleStart = std::chrono::steady_clock::now();
282   }
283
284   while (!stop_.load(std::memory_order_acquire)) {
285     applyLoopKeepAlive();
286     ++nextLoopCnt_;
287
288     // Run the before loop callbacks
289     LoopCallbackList callbacks;
290     callbacks.swap(runBeforeLoopCallbacks_);
291
292     while(!callbacks.empty()) {
293       auto* item = &callbacks.front();
294       callbacks.pop_front();
295       item->runLoopCallback();
296     }
297
298     // nobody can add loop callbacks from within this thread if
299     // we don't have to handle anything to start with...
300     if (blocking && loopCallbacks_.empty()) {
301       res = event_base_loop(evb_, EVLOOP_ONCE);
302     } else {
303       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
304     }
305
306     ranLoopCallbacks = runLoopCallbacks();
307
308     if (enableTimeMeasurement_) {
309       busy = std::chrono::duration_cast<std::chrono::microseconds>(
310           std::chrono::steady_clock::now() - startWork_);
311       idle = std::chrono::duration_cast<std::chrono::microseconds>(
312           startWork_ - idleStart);
313
314       avgLoopTime_.addSample(std::chrono::microseconds(idle),
315         std::chrono::microseconds(busy));
316       maxLatencyLoopTime_.addSample(std::chrono::microseconds(idle),
317         std::chrono::microseconds(busy));
318
319       if (observer_) {
320         if (observerSampleCount_++ == observer_->getSampleRate()) {
321           observerSampleCount_ = 0;
322           observer_->loopSample(busy.count(), idle.count());
323         }
324       }
325
326       VLOG(11) << "EventBase "  << this         << " did not timeout " <<
327         " loop time guess: "    << (busy + idle).count()  <<
328         " idle time: "          << idle.count()         <<
329         " busy time: "          << busy.count()         <<
330         " avgLoopTime: "        << avgLoopTime_.get() <<
331         " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
332         " maxLatency_: "        << maxLatency_.count() << "us" <<
333         " notificationQueueSize: " << getNotificationQueueSize() <<
334         " nothingHandledYet(): " << nothingHandledYet();
335
336       // see if our average loop time has exceeded our limit
337       if ((maxLatency_ > std::chrono::microseconds::zero()) &&
338           (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
339         maxLatencyCob_();
340         // back off temporarily -- don't keep spamming maxLatencyCob_
341         // if we're only a bit over the limit
342         maxLatencyLoopTime_.dampen(0.9);
343       }
344
345       // Our loop run did real work; reset the idle timer
346       idleStart = std::chrono::steady_clock::now();
347     } else {
348       VLOG(11) << "EventBase " << this << " did not timeout";
349     }
350
351     // If the event loop indicate that there were no more events, and
352     // we also didn't have any loop callbacks to run, there is nothing left to
353     // do.
354     if (res != 0 && !ranLoopCallbacks) {
355       // Since Notification Queue is marked 'internal' some events may not have
356       // run.  Run them manually if so, and continue looping.
357       //
358       if (getNotificationQueueSize() > 0) {
359         fnRunner_->handlerReady(0);
360       } else {
361         break;
362       }
363     }
364
365     if (enableTimeMeasurement_) {
366       VLOG(5) << "EventBase " << this << " loop time: " <<
367         getTimeDelta(&prev).count();
368     }
369
370     if (once) {
371       break;
372     }
373   }
374   // Reset stop_ so loop() can be called again
375   stop_ = false;
376
377   if (res < 0) {
378     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
379     return false;
380   } else if (res == 1) {
381     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
382   } else if (res > 1) {
383     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
384     return false;
385   }
386
387   loopThread_.store({}, std::memory_order_release);
388
389   VLOG(5) << "EventBase(): Done with loop.";
390   return true;
391 }
392
393 ssize_t EventBase::loopKeepAliveCount() {
394   if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
395     loopKeepAliveCount_ +=
396         loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
397   }
398   DCHECK_GE(loopKeepAliveCount_, 0);
399   return loopKeepAliveCount_;
400 }
401
402 void EventBase::applyLoopKeepAlive() {
403   if (loopKeepAliveActive_ && loopKeepAliveCount() == 0) {
404     // Restore the notification queue internal flag
405     fnRunner_->stopConsuming();
406     fnRunner_->startConsumingInternal(this, queue_.get());
407     loopKeepAliveActive_ = false;
408   } else if (!loopKeepAliveActive_ && loopKeepAliveCount() > 0) {
409     // Update the notification queue event to treat it as a normal
410     // (non-internal) event.  The notification queue event always remains
411     // installed, and the main loop won't exit with it installed.
412     fnRunner_->stopConsuming();
413     fnRunner_->startConsuming(this, queue_.get());
414     loopKeepAliveActive_ = true;
415   }
416 }
417
418 void EventBase::loopForever() {
419   bool ret;
420   {
421     SCOPE_EXIT {
422       applyLoopKeepAlive();
423     };
424     // Make sure notification queue events are treated as normal events.
425     // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
426     // released inside a loop.
427     ++loopKeepAliveCount_;
428     SCOPE_EXIT {
429       --loopKeepAliveCount_;
430     };
431     ret = loop();
432   }
433
434   if (!ret) {
435     folly::throwSystemError("error in EventBase::loopForever()");
436   }
437 }
438
439 void EventBase::bumpHandlingTime() {
440   if (!enableTimeMeasurement_) {
441     return;
442   }
443
444   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
445     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
446   if (nothingHandledYet()) {
447     latestLoopCnt_ = nextLoopCnt_;
448     // set the time
449     startWork_ = std::chrono::steady_clock::now();
450
451     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
452              << " (loop) startWork_ " << startWork_.time_since_epoch().count();
453   }
454 }
455
456 void EventBase::terminateLoopSoon() {
457   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
458
459   // Set stop to true, so the event loop will know to exit.
460   // TODO: We should really use an atomic operation here with a release
461   // barrier.
462   stop_ = true;
463
464   // Call event_base_loopbreak() so that libevent will exit the next time
465   // around the loop.
466   event_base_loopbreak(evb_);
467
468   // If terminateLoopSoon() is called from another thread,
469   // the EventBase thread might be stuck waiting for events.
470   // In this case, it won't wake up and notice that stop_ is set until it
471   // receives another event.  Send an empty frame to the notification queue
472   // so that the event loop will wake up even if there are no other events.
473   //
474   // We don't care about the return value of trySendFrame().  If it fails
475   // this likely means the EventBase already has lots of events waiting
476   // anyway.
477   try {
478     queue_->putMessage(nullptr);
479   } catch (...) {
480     // We don't care if putMessage() fails.  This likely means
481     // the EventBase already has lots of events waiting anyway.
482   }
483 }
484
485 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
486   DCHECK(isInEventBaseThread());
487   callback->cancelLoopCallback();
488   callback->context_ = RequestContext::saveContext();
489   if (runOnceCallbacks_ != nullptr && thisIteration) {
490     runOnceCallbacks_->push_back(*callback);
491   } else {
492     loopCallbacks_.push_back(*callback);
493   }
494 }
495
496 void EventBase::runInLoop(Func cob, bool thisIteration) {
497   DCHECK(isInEventBaseThread());
498   auto wrapper = new FunctionLoopCallback(std::move(cob));
499   wrapper->context_ = RequestContext::saveContext();
500   if (runOnceCallbacks_ != nullptr && thisIteration) {
501     runOnceCallbacks_->push_back(*wrapper);
502   } else {
503     loopCallbacks_.push_back(*wrapper);
504   }
505 }
506
507 void EventBase::runOnDestruction(LoopCallback* callback) {
508   std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
509   callback->cancelLoopCallback();
510   onDestructionCallbacks_.push_back(*callback);
511 }
512
513 void EventBase::runBeforeLoop(LoopCallback* callback) {
514   DCHECK(isInEventBaseThread());
515   callback->cancelLoopCallback();
516   runBeforeLoopCallbacks_.push_back(*callback);
517 }
518
519 bool EventBase::runInEventBaseThread(Func fn) {
520   // Send the message.
521   // It will be received by the FunctionRunner in the EventBase's thread.
522
523   // We try not to schedule nullptr callbacks
524   if (!fn) {
525     LOG(ERROR) << "EventBase " << this
526                << ": Scheduling nullptr callbacks is not allowed";
527     return false;
528   }
529
530   // Short-circuit if we are already in our event base
531   if (inRunningEventBaseThread()) {
532     runInLoop(std::move(fn));
533     return true;
534
535   }
536
537   try {
538     queue_->putMessage(std::move(fn));
539   } catch (const std::exception& ex) {
540     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
541                << "for EventBase thread: " << ex.what();
542     return false;
543   }
544
545   return true;
546 }
547
548 bool EventBase::runInEventBaseThreadAndWait(FuncRef fn) {
549   if (inRunningEventBaseThread()) {
550     LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
551                << "allowed";
552     return false;
553   }
554
555   bool ready = false;
556   std::mutex m;
557   std::condition_variable cv;
558   runInEventBaseThread([&] {
559       SCOPE_EXIT {
560         std::unique_lock<std::mutex> l(m);
561         ready = true;
562         cv.notify_one();
563         // We cannot release the lock before notify_one, because a spurious
564         // wakeup in the waiting thread may lead to cv and m going out of scope
565         // prematurely.
566       };
567       fn();
568   });
569   std::unique_lock<std::mutex> l(m);
570   cv.wait(l, [&] { return ready; });
571
572   return true;
573 }
574
575 bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(FuncRef fn) {
576   if (isInEventBaseThread()) {
577     fn();
578     return true;
579   } else {
580     return runInEventBaseThreadAndWait(std::move(fn));
581   }
582 }
583
584 bool EventBase::runLoopCallbacks() {
585   if (!loopCallbacks_.empty()) {
586     bumpHandlingTime();
587     // Swap the loopCallbacks_ list with a temporary list on our stack.
588     // This way we will only run callbacks scheduled at the time
589     // runLoopCallbacks() was invoked.
590     //
591     // If any of these callbacks in turn call runInLoop() to schedule more
592     // callbacks, those new callbacks won't be run until the next iteration
593     // around the event loop.  This prevents runInLoop() callbacks from being
594     // able to start file descriptor and timeout based events.
595     LoopCallbackList currentCallbacks;
596     currentCallbacks.swap(loopCallbacks_);
597     runOnceCallbacks_ = &currentCallbacks;
598
599     while (!currentCallbacks.empty()) {
600       LoopCallback* callback = &currentCallbacks.front();
601       currentCallbacks.pop_front();
602       folly::RequestContextScopeGuard rctx(callback->context_);
603       callback->runLoopCallback();
604     }
605
606     runOnceCallbacks_ = nullptr;
607     return true;
608   }
609   return false;
610 }
611
612 void EventBase::initNotificationQueue() {
613   // Infinite size queue
614   queue_.reset(new NotificationQueue<Func>());
615
616   // We allocate fnRunner_ separately, rather than declaring it directly
617   // as a member of EventBase solely so that we don't need to include
618   // NotificationQueue.h from EventBase.h
619   fnRunner_.reset(new FunctionRunner());
620
621   // Mark this as an internal event, so event_base_loop() will return if
622   // there are no other events besides this one installed.
623   //
624   // Most callers don't care about the internal notification queue used by
625   // EventBase.  The queue is always installed, so if we did count the queue as
626   // an active event, loop() would never exit with no more events to process.
627   // Users can use loopForever() if they do care about the notification queue.
628   // (This is useful for EventBase threads that do nothing but process
629   // runInEventBaseThread() notifications.)
630   fnRunner_->startConsumingInternal(this, queue_.get());
631 }
632
633 void EventBase::SmoothLoopTime::setTimeInterval(
634     std::chrono::microseconds timeInterval) {
635   expCoeff_ = -1.0 / timeInterval.count();
636   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
637 }
638
639 void EventBase::SmoothLoopTime::reset(double value) {
640   value_ = value;
641 }
642
643 void EventBase::SmoothLoopTime::addSample(
644     std::chrono::microseconds idle,
645     std::chrono::microseconds busy) {
646   /*
647    * Position at which the busy sample is considered to be taken.
648    * (Allows to quickly skew our average without editing much code)
649    */
650   enum BusySamplePosition {
651     RIGHT = 0, // busy sample placed at the end of the iteration
652     CENTER = 1, // busy sample placed at the middle point of the iteration
653     LEFT = 2, // busy sample placed at the beginning of the iteration
654   };
655
656   // See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
657   // and D676020 for more info on this calculation.
658   VLOG(11) << "idle " << idle.count() << " oldBusyLeftover_ "
659            << oldBusyLeftover_.count() << " idle + oldBusyLeftover_ "
660            << (idle + oldBusyLeftover_).count() << " busy " << busy.count()
661            << " " << __PRETTY_FUNCTION__;
662   idle += oldBusyLeftover_ + busy;
663   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
664   idle -= oldBusyLeftover_;
665
666   double coeff = exp(idle.count() * expCoeff_);
667   value_ *= coeff;
668   value_ += (1.0 - coeff) * busy.count();
669 }
670
671 bool EventBase::nothingHandledYet() const noexcept {
672   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
673   return (nextLoopCnt_ != latestLoopCnt_);
674 }
675
676 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
677                                       InternalEnum internal) {
678
679   struct event* ev = obj->getEvent();
680   assert(ev->ev_base == nullptr);
681
682   event_base_set(getLibeventBase(), ev);
683   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
684     // Set the EVLIST_INTERNAL flag
685     event_ref_flags(ev) |= EVLIST_INTERNAL;
686   }
687 }
688
689 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
690   cancelTimeout(obj);
691   struct event* ev = obj->getEvent();
692   ev->ev_base = nullptr;
693 }
694
695 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
696                                  TimeoutManager::timeout_type timeout) {
697   assert(isInEventBaseThread());
698   // Set up the timeval and add the event
699   struct timeval tv;
700   tv.tv_sec = long(timeout.count() / 1000LL);
701   tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
702
703   struct event* ev = obj->getEvent();
704   if (event_add(ev, &tv) < 0) {
705     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
706     return false;
707   }
708
709   return true;
710 }
711
712 void EventBase::cancelTimeout(AsyncTimeout* obj) {
713   assert(isInEventBaseThread());
714   struct event* ev = obj->getEvent();
715   if (EventUtil::isEventRegistered(ev)) {
716     event_del(ev);
717   }
718 }
719
720 void EventBase::setName(const std::string& name) {
721   assert(isInEventBaseThread());
722   name_ = name;
723
724   if (isRunning()) {
725     setThreadName(loopThread_.load(std::memory_order_relaxed),
726                   name_);
727   }
728 }
729
730 const std::string& EventBase::getName() {
731   assert(isInEventBaseThread());
732   return name_;
733 }
734
735 const char* EventBase::getLibeventVersion() { return event_get_version(); }
736 const char* EventBase::getLibeventMethod() { return event_get_method(); }
737
738 } // folly