0e34c5e2c71b50fc6279c01aeec360d76a1c8c2f
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/EventBase.h>
22
23 #include <folly/ThreadName.h>
24 #include <folly/io/async/NotificationQueue.h>
25
26 #include <boost/static_assert.hpp>
27 #include <fcntl.h>
28 #include <pthread.h>
29 #include <unistd.h>
30
31 namespace {
32
33 using folly::Cob;
34 using folly::EventBase;
35
36 template <typename Callback>
37 class FunctionLoopCallback : public EventBase::LoopCallback {
38  public:
39   explicit FunctionLoopCallback(Cob&& function)
40       : function_(std::move(function)) {}
41
42   explicit FunctionLoopCallback(const Cob& function)
43       : function_(function) {}
44
45   virtual void runLoopCallback() noexcept {
46     function_();
47     delete this;
48   }
49
50  private:
51   Callback function_;
52 };
53
54 }
55
56 namespace folly {
57
58 const int kNoFD = -1;
59
60 /*
61  * EventBase::FunctionRunner
62  */
63
64 class EventBase::FunctionRunner
65     : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
66  public:
67   void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
68
69     // In libevent2, internal events do not break the loop.
70     // Most users would expect loop(), followed by runInEventBaseThread(),
71     // to break the loop and check if it should exit or not.
72     // To have similar bejaviour to libevent1.4, tell the loop to break here.
73     // Note that loop() may still continue to loop, but it will also check the
74     // stop_ flag as well as runInLoop callbacks, etc.
75     event_base_loopbreak(getEventBase()->evb_);
76
77     if (msg.first == nullptr && msg.second == nullptr) {
78       // terminateLoopSoon() sends a null message just to
79       // wake up the loop.  We can ignore these messages.
80       return;
81     }
82
83     // If function is nullptr, just log and move on
84     if (!msg.first) {
85       LOG(ERROR) << "nullptr callback registered to be run in "
86                  << "event base thread";
87       return;
88     }
89
90     // The function should never throw an exception, because we have no
91     // way of knowing what sort of error handling to perform.
92     //
93     // If it does throw, log a message and abort the program.
94     try {
95       msg.first(msg.second);
96     } catch (const std::exception& ex) {
97       LOG(ERROR) << "runInEventBaseThread() function threw a "
98                  << typeid(ex).name() << " exception: " << ex.what();
99       abort();
100     } catch (...) {
101       LOG(ERROR) << "runInEventBaseThread() function threw an exception";
102       abort();
103     }
104   }
105 };
106
107 /*
108  * EventBase::CobTimeout methods
109  */
110
111 void EventBase::CobTimeout::timeoutExpired() noexcept {
112   // For now, we just swallow any exceptions that the callback threw.
113   try {
114     cob_();
115   } catch (const std::exception& ex) {
116     LOG(ERROR) << "EventBase::runAfterDelay() callback threw "
117                << typeid(ex).name() << " exception: " << ex.what();
118   } catch (...) {
119     LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception "
120                << "type";
121   }
122
123   // The CobTimeout object was allocated on the heap by runAfterDelay(),
124   // so delete it now that the it has fired.
125   delete this;
126 }
127
128
129 // The interface used to libevent is not thread-safe.  Calls to
130 // event_init() and event_base_free() directly modify an internal
131 // global 'current_base', so a mutex is required to protect this.
132 //
133 // event_init() should only ever be called once.  Subsequent calls
134 // should be made to event_base_new().  We can recognise that
135 // event_init() has already been called by simply inspecting current_base.
136 static std::mutex libevent_mutex_;
137
138 /*
139  * EventBase methods
140  */
141
142 EventBase::EventBase()
143   : runOnceCallbacks_(nullptr)
144   , stop_(false)
145   , loopThread_(0)
146   , queue_(nullptr)
147   , fnRunner_(nullptr)
148   , maxLatency_(0)
149   , avgLoopTime_(2000000)
150   , maxLatencyLoopTime_(avgLoopTime_)
151   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
152   , latestLoopCnt_(nextLoopCnt_)
153   , startWork_(0)
154   , observer_(nullptr)
155   , observerSampleCount_(0) {
156   {
157     std::lock_guard<std::mutex> lock(libevent_mutex_);
158
159     // The value 'current_base' (libevent 1) or
160     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
161     // allowing examination of its value without an explicit reference here.
162     // If ev.ev_base is NULL, then event_init() must be called, otherwise
163     // call event_base_new().
164     struct event ev;
165     event_set(&ev, 0, 0, nullptr, nullptr);
166     evb_ = (ev.ev_base) ? event_base_new() : event_init();
167   }
168   if (UNLIKELY(evb_ == nullptr)) {
169     LOG(ERROR) << "EventBase(): Failed to init event base.";
170     folly::throwSystemError("error in EventBase::EventBase()");
171   }
172   VLOG(5) << "EventBase(): Created.";
173   initNotificationQueue();
174   RequestContext::getStaticContext();
175 }
176
177 // takes ownership of the event_base
178 EventBase::EventBase(event_base* evb)
179   : runOnceCallbacks_(nullptr)
180   , stop_(false)
181   , loopThread_(0)
182   , evb_(evb)
183   , queue_(nullptr)
184   , fnRunner_(nullptr)
185   , maxLatency_(0)
186   , avgLoopTime_(2000000)
187   , maxLatencyLoopTime_(avgLoopTime_)
188   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
189   , latestLoopCnt_(nextLoopCnt_)
190   , startWork_(0)
191   , observer_(nullptr)
192   , observerSampleCount_(0) {
193   if (UNLIKELY(evb_ == nullptr)) {
194     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
195     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
196   }
197   initNotificationQueue();
198   RequestContext::getStaticContext();
199 }
200
201 EventBase::~EventBase() {
202   // Call all destruction callbacks, before we start cleaning up our state.
203   while (!onDestructionCallbacks_.empty()) {
204     LoopCallback* callback = &onDestructionCallbacks_.front();
205     onDestructionCallbacks_.pop_front();
206     callback->runLoopCallback();
207   }
208
209   // Delete any unfired callback objects, so that we don't leak memory
210   // (Note that we don't fire them.  The caller is responsible for cleaning up
211   // its own data structures if it destroys the EventBase with unfired events
212   // remaining.)
213   while (!pendingCobTimeouts_.empty()) {
214     CobTimeout* timeout = &pendingCobTimeouts_.front();
215     delete timeout;
216   }
217
218   while (!noWaitLoopCallbacks_.empty()) {
219     delete &noWaitLoopCallbacks_.front();
220   }
221
222   (void) runLoopCallbacks(false);
223
224   // Stop consumer before deleting NotificationQueue
225   fnRunner_->stopConsuming();
226   {
227     std::lock_guard<std::mutex> lock(libevent_mutex_);
228     event_base_free(evb_);
229   }
230   VLOG(5) << "EventBase(): Destroyed.";
231 }
232
233 int EventBase::getNotificationQueueSize() const {
234   return queue_->size();
235 }
236
237 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
238   fnRunner_->setMaxReadAtOnce(maxAtOnce);
239 }
240
241 // Set smoothing coefficient for loop load average; input is # of milliseconds
242 // for exp(-1) decay.
243 void EventBase::setLoadAvgMsec(uint32_t ms) {
244   uint64_t us = 1000 * ms;
245   if (ms > 0) {
246     maxLatencyLoopTime_.setTimeInterval(us);
247     avgLoopTime_.setTimeInterval(us);
248   } else {
249     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
250   }
251 }
252
253 void EventBase::resetLoadAvg(double value) {
254   avgLoopTime_.reset(value);
255   maxLatencyLoopTime_.reset(value);
256 }
257
258 static std::chrono::milliseconds
259 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
260   auto result = std::chrono::steady_clock::now() - *prev;
261   *prev = std::chrono::steady_clock::now();
262
263   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
264 }
265
266 void EventBase::waitUntilRunning() {
267   while (!isRunning()) {
268     sched_yield();
269   }
270 }
271
272 // enters the event_base loop -- will only exit when forced to
273 bool EventBase::loop() {
274   return loopBody();
275 }
276
277 bool EventBase::loopOnce(int flags) {
278   return loopBody(flags | EVLOOP_ONCE);
279 }
280
281 bool EventBase::loopBody(int flags) {
282   VLOG(5) << "EventBase(): Starting loop.";
283   int res = 0;
284   bool ranLoopCallbacks;
285   bool blocking = !(flags & EVLOOP_NONBLOCK);
286   bool once = (flags & EVLOOP_ONCE);
287
288   loopThread_.store(pthread_self(), std::memory_order_release);
289
290   if (!name_.empty()) {
291     setThreadName(name_);
292   }
293
294   auto prev = std::chrono::steady_clock::now();
295   int64_t idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
296     std::chrono::steady_clock::now().time_since_epoch()).count();
297
298   // TODO: Read stop_ atomically with an acquire barrier.
299   while (!stop_) {
300     ++nextLoopCnt_;
301
302     // nobody can add loop callbacks from within this thread if
303     // we don't have to handle anything to start with...
304     if (blocking && loopCallbacks_.empty()) {
305       LoopCallbackList callbacks;
306       callbacks.swap(noWaitLoopCallbacks_);
307
308       while(!callbacks.empty()) {
309         auto* item = &callbacks.front();
310         callbacks.pop_front();
311         item->runLoopCallback();
312       }
313
314       res = event_base_loop(evb_, EVLOOP_ONCE);
315     } else {
316       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
317     }
318
319     ranLoopCallbacks = runLoopCallbacks();
320
321     int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
322       std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_;
323     int64_t idle = startWork_ - idleStart;
324
325     avgLoopTime_.addSample(idle, busy);
326     maxLatencyLoopTime_.addSample(idle, busy);
327
328     if (observer_) {
329       if (observerSampleCount_++ == observer_->getSampleRate()) {
330         observerSampleCount_ = 0;
331         observer_->loopSample(busy, idle);
332       }
333     }
334
335     VLOG(11) << "EventBase " << this         << " did not timeout "
336      " loop time guess: "    << busy + idle  <<
337      " idle time: "          << idle         <<
338      " busy time: "          << busy         <<
339      " avgLoopTime: "        << avgLoopTime_.get() <<
340      " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
341      " maxLatency_: "        << maxLatency_ <<
342      " nothingHandledYet(): "<< nothingHandledYet();
343
344     // see if our average loop time has exceeded our limit
345     if ((maxLatency_ > 0) &&
346         (maxLatencyLoopTime_.get() > double(maxLatency_))) {
347       maxLatencyCob_();
348       // back off temporarily -- don't keep spamming maxLatencyCob_
349       // if we're only a bit over the limit
350       maxLatencyLoopTime_.dampen(0.9);
351     }
352
353     // Our loop run did real work; reset the idle timer
354     idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
355       std::chrono::steady_clock::now().time_since_epoch()).count();
356
357     // If the event loop indicate that there were no more events, and
358     // we also didn't have any loop callbacks to run, there is nothing left to
359     // do.
360     if (res != 0 && !ranLoopCallbacks) {
361       // Since Notification Queue is marked 'internal' some events may not have
362       // run.  Run them manually if so, and continue looping.
363       //
364       if (getNotificationQueueSize() > 0) {
365         fnRunner_->handlerReady(0);
366       } else {
367         break;
368       }
369     }
370
371     VLOG(5) << "EventBase " << this << " loop time: " <<
372       getTimeDelta(&prev).count();
373
374     if (once) {
375       break;
376     }
377   }
378   // Reset stop_ so loop() can be called again
379   stop_ = false;
380
381   if (res < 0) {
382     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
383     return false;
384   } else if (res == 1) {
385     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
386   } else if (res > 1) {
387     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
388     return false;
389   }
390
391   loopThread_.store(0, std::memory_order_release);
392
393   VLOG(5) << "EventBase(): Done with loop.";
394   return true;
395 }
396
397 void EventBase::loopForever() {
398   // Update the notification queue event to treat it as a normal (non-internal)
399   // event.  The notification queue event always remains installed, and the main
400   // loop won't exit with it installed.
401   fnRunner_->stopConsuming();
402   fnRunner_->startConsuming(this, queue_.get());
403
404   bool ret = loop();
405
406   // Restore the notification queue internal flag
407   fnRunner_->stopConsuming();
408   fnRunner_->startConsumingInternal(this, queue_.get());
409
410   if (!ret) {
411     folly::throwSystemError("error in EventBase::loopForever()");
412   }
413 }
414
415 bool EventBase::bumpHandlingTime() {
416   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
417     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
418   if(nothingHandledYet()) {
419     latestLoopCnt_ = nextLoopCnt_;
420     // set the time
421     startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
422       std::chrono::steady_clock::now().time_since_epoch()).count();
423
424     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
425       " (loop) startWork_ " << startWork_;
426     return true;
427   }
428   return false;
429 }
430
431 void EventBase::terminateLoopSoon() {
432   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
433
434   // Set stop to true, so the event loop will know to exit.
435   // TODO: We should really use an atomic operation here with a release
436   // barrier.
437   stop_ = true;
438
439   // Call event_base_loopbreak() so that libevent will exit the next time
440   // around the loop.
441   event_base_loopbreak(evb_);
442
443   // If terminateLoopSoon() is called from another thread,
444   // the EventBase thread might be stuck waiting for events.
445   // In this case, it won't wake up and notice that stop_ is set until it
446   // receives another event.  Send an empty frame to the notification queue
447   // so that the event loop will wake up even if there are no other events.
448   //
449   // We don't care about the return value of trySendFrame().  If it fails
450   // this likely means the EventBase already has lots of events waiting
451   // anyway.
452   try {
453     queue_->putMessage(std::make_pair(nullptr, nullptr));
454   } catch (...) {
455     // We don't care if putMessage() fails.  This likely means
456     // the EventBase already has lots of events waiting anyway.
457   }
458 }
459
460 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
461   DCHECK(isInEventBaseThread());
462   callback->cancelLoopCallback();
463   callback->context_ = RequestContext::saveContext();
464   if (runOnceCallbacks_ != nullptr && thisIteration) {
465     runOnceCallbacks_->push_back(*callback);
466   } else {
467     loopCallbacks_.push_back(*callback);
468   }
469 }
470
471 void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
472   DCHECK(isInEventBaseThread());
473   auto wrapper = new FunctionLoopCallback<Cob>(cob);
474   wrapper->context_ = RequestContext::saveContext();
475   if (runOnceCallbacks_ != nullptr && thisIteration) {
476     runOnceCallbacks_->push_back(*wrapper);
477   } else {
478     loopCallbacks_.push_back(*wrapper);
479   }
480 }
481
482 void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
483   DCHECK(isInEventBaseThread());
484   auto wrapper = new FunctionLoopCallback<Cob>(std::move(cob));
485   wrapper->context_ = RequestContext::saveContext();
486   if (runOnceCallbacks_ != nullptr && thisIteration) {
487     runOnceCallbacks_->push_back(*wrapper);
488   } else {
489     loopCallbacks_.push_back(*wrapper);
490   }
491 }
492
493 void EventBase::runOnDestruction(LoopCallback* callback) {
494   DCHECK(isInEventBaseThread());
495   callback->cancelLoopCallback();
496   onDestructionCallbacks_.push_back(*callback);
497 }
498
499 void EventBase::runBeforeLoop(LoopCallback* callback) {
500   DCHECK(isInEventBaseThread());
501   callback->cancelLoopCallback();
502   noWaitLoopCallbacks_.push_back(*callback);
503 }
504
505 bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
506   // Send the message.
507   // It will be received by the FunctionRunner in the EventBase's thread.
508
509   // We try not to schedule nullptr callbacks
510   if (!fn) {
511     LOG(ERROR) << "EventBase " << this
512                << ": Scheduling nullptr callbacks is not allowed";
513     return false;
514   }
515
516   // Short-circuit if we are already in our event base
517   if (inRunningEventBaseThread()) {
518     runInLoop(new RunInLoopCallback(fn, arg));
519     return true;
520
521   }
522
523   try {
524     queue_->putMessage(std::make_pair(fn, arg));
525   } catch (const std::exception& ex) {
526     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
527                << fn << "for EventBase thread: " << ex.what();
528     return false;
529   }
530
531   return true;
532 }
533
534 bool EventBase::runInEventBaseThread(const Cob& fn) {
535   // Short-circuit if we are already in our event base
536   if (inRunningEventBaseThread()) {
537     runInLoop(fn);
538     return true;
539   }
540
541   Cob* fnCopy;
542   // Allocate a copy of the function so we can pass it to the other thread
543   // The other thread will delete this copy once the function has been run
544   try {
545     fnCopy = new Cob(fn);
546   } catch (const std::bad_alloc& ex) {
547     LOG(ERROR) << "failed to allocate tr::function copy "
548                << "for runInEventBaseThread()";
549     return false;
550   }
551
552   if (!runInEventBaseThread(&EventBase::runFunctionPtr, fnCopy)) {
553     delete fnCopy;
554     return false;
555   }
556
557   return true;
558 }
559
560 bool EventBase::runAfterDelay(const Cob& cob,
561                                int milliseconds,
562                                TimeoutManager::InternalEnum in) {
563   CobTimeout* timeout = new CobTimeout(this, cob, in);
564   if (!timeout->scheduleTimeout(milliseconds)) {
565     delete timeout;
566     return false;
567   }
568
569   pendingCobTimeouts_.push_back(*timeout);
570   return true;
571 }
572
573 bool EventBase::runLoopCallbacks(bool setContext) {
574   if (!loopCallbacks_.empty()) {
575     bumpHandlingTime();
576     // Swap the loopCallbacks_ list with a temporary list on our stack.
577     // This way we will only run callbacks scheduled at the time
578     // runLoopCallbacks() was invoked.
579     //
580     // If any of these callbacks in turn call runInLoop() to schedule more
581     // callbacks, those new callbacks won't be run until the next iteration
582     // around the event loop.  This prevents runInLoop() callbacks from being
583     // able to start file descriptor and timeout based events.
584     LoopCallbackList currentCallbacks;
585     currentCallbacks.swap(loopCallbacks_);
586     runOnceCallbacks_ = &currentCallbacks;
587
588     while (!currentCallbacks.empty()) {
589       LoopCallback* callback = &currentCallbacks.front();
590       currentCallbacks.pop_front();
591       if (setContext) {
592         RequestContext::setContext(callback->context_);
593       }
594       callback->runLoopCallback();
595     }
596
597     runOnceCallbacks_ = nullptr;
598     return true;
599   }
600   return false;
601 }
602
603 void EventBase::initNotificationQueue() {
604   // Infinite size queue
605   queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
606
607   // We allocate fnRunner_ separately, rather than declaring it directly
608   // as a member of EventBase solely so that we don't need to include
609   // NotificationQueue.h from EventBase.h
610   fnRunner_.reset(new FunctionRunner());
611
612   // Mark this as an internal event, so event_base_loop() will return if
613   // there are no other events besides this one installed.
614   //
615   // Most callers don't care about the internal notification queue used by
616   // EventBase.  The queue is always installed, so if we did count the queue as
617   // an active event, loop() would never exit with no more events to process.
618   // Users can use loopForever() if they do care about the notification queue.
619   // (This is useful for EventBase threads that do nothing but process
620   // runInEventBaseThread() notifications.)
621   fnRunner_->startConsumingInternal(this, queue_.get());
622 }
623
624 void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) {
625   expCoeff_ = -1.0/timeInterval;
626   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
627 }
628
629 void EventBase::SmoothLoopTime::reset(double value) {
630   value_ = value;
631 }
632
633 void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) {
634     /*
635      * Position at which the busy sample is considered to be taken.
636      * (Allows to quickly skew our average without editing much code)
637      */
638     enum BusySamplePosition {
639       RIGHT = 0,  // busy sample placed at the end of the iteration
640       CENTER = 1, // busy sample placed at the middle point of the iteration
641       LEFT = 2,   // busy sample placed at the beginning of the iteration
642     };
643
644   VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ <<
645               " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ <<
646               " busy " << busy << " " << __PRETTY_FUNCTION__;
647   idle += oldBusyLeftover_ + busy;
648   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
649   idle -= oldBusyLeftover_;
650
651   double coeff = exp(idle * expCoeff_);
652   value_ *= coeff;
653   value_ += (1.0 - coeff) * busy;
654 }
655
656 bool EventBase::nothingHandledYet() {
657   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
658   return (nextLoopCnt_ != latestLoopCnt_);
659 }
660
661 /* static */
662 void EventBase::runFunctionPtr(Cob* fn) {
663   // The function should never throw an exception, because we have no
664   // way of knowing what sort of error handling to perform.
665   //
666   // If it does throw, log a message and abort the program.
667   try {
668     (*fn)();
669   } catch (const std::exception &ex) {
670     LOG(ERROR) << "runInEventBaseThread() std::function threw a "
671                << typeid(ex).name() << " exception: " << ex.what();
672     abort();
673   } catch (...) {
674     LOG(ERROR) << "runInEventBaseThread() std::function threw an exception";
675     abort();
676   }
677
678   // The function object was allocated by runInEventBaseThread().
679   // Delete it once it has been run.
680   delete fn;
681 }
682
683 EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
684     : fn_(fn)
685     , arg_(arg) {}
686
687 void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
688   fn_(arg_);
689   delete this;
690 }
691
692 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
693                                       InternalEnum internal) {
694
695   struct event* ev = obj->getEvent();
696   assert(ev->ev_base == nullptr);
697
698   event_base_set(getLibeventBase(), ev);
699   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
700     // Set the EVLIST_INTERNAL flag
701     ev->ev_flags |= EVLIST_INTERNAL;
702   }
703 }
704
705 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
706   cancelTimeout(obj);
707   struct event* ev = obj->getEvent();
708   ev->ev_base = nullptr;
709 }
710
711 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
712                                  std::chrono::milliseconds timeout) {
713   assert(isInEventBaseThread());
714   // Set up the timeval and add the event
715   struct timeval tv;
716   tv.tv_sec = timeout.count() / 1000LL;
717   tv.tv_usec = (timeout.count() % 1000LL) * 1000LL;
718
719   struct event* ev = obj->getEvent();
720   if (event_add(ev, &tv) < 0) {
721     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
722     return false;
723   }
724
725   return true;
726 }
727
728 void EventBase::cancelTimeout(AsyncTimeout* obj) {
729   assert(isInEventBaseThread());
730   struct event* ev = obj->getEvent();
731   if (EventUtil::isEventRegistered(ev)) {
732     event_del(ev);
733   }
734 }
735
736 void EventBase::setName(const std::string& name) {
737   assert(isInEventBaseThread());
738   name_ = name;
739
740   if (isRunning()) {
741     setThreadName(loopThread_.load(std::memory_order_relaxed),
742                   name_);
743   }
744 }
745
746 const std::string& EventBase::getName() {
747   assert(isInEventBaseThread());
748   return name_;
749 }
750
751 const char* EventBase::getLibeventVersion() { return event_get_version(); }
752 const char* EventBase::getLibeventMethod() { return event_get_method(); }
753
754 } // folly