add consumeUntilDrained API to NotificationQueue::Consumer
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/EventBase.h>
22
23 #include <folly/ThreadName.h>
24 #include <folly/io/async/NotificationQueue.h>
25
26 #include <boost/static_assert.hpp>
27 #include <fcntl.h>
28 #include <pthread.h>
29 #include <unistd.h>
30
31 namespace {
32
33 using folly::Cob;
34 using folly::EventBase;
35
36 template <typename Callback>
37 class FunctionLoopCallback : public EventBase::LoopCallback {
38  public:
39   explicit FunctionLoopCallback(Cob&& function)
40       : function_(std::move(function)) {}
41
42   explicit FunctionLoopCallback(const Cob& function)
43       : function_(function) {}
44
45   virtual void runLoopCallback() noexcept {
46     function_();
47     delete this;
48   }
49
50  private:
51   Callback function_;
52 };
53
54 }
55
56 namespace folly {
57
58 const int kNoFD = -1;
59
60 /*
61  * EventBase::FunctionRunner
62  */
63
64 class EventBase::FunctionRunner
65     : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
66  public:
67   void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
68
69     // In libevent2, internal events do not break the loop.
70     // Most users would expect loop(), followed by runInEventBaseThread(),
71     // to break the loop and check if it should exit or not.
72     // To have similar bejaviour to libevent1.4, tell the loop to break here.
73     // Note that loop() may still continue to loop, but it will also check the
74     // stop_ flag as well as runInLoop callbacks, etc.
75     event_base_loopbreak(getEventBase()->evb_);
76
77     if (msg.first == nullptr && msg.second == nullptr) {
78       // terminateLoopSoon() sends a null message just to
79       // wake up the loop.  We can ignore these messages.
80       return;
81     }
82
83     // If function is nullptr, just log and move on
84     if (!msg.first) {
85       LOG(ERROR) << "nullptr callback registered to be run in "
86                  << "event base thread";
87       return;
88     }
89
90     // The function should never throw an exception, because we have no
91     // way of knowing what sort of error handling to perform.
92     //
93     // If it does throw, log a message and abort the program.
94     try {
95       msg.first(msg.second);
96     } catch (const std::exception& ex) {
97       LOG(ERROR) << "runInEventBaseThread() function threw a "
98                  << typeid(ex).name() << " exception: " << ex.what();
99       abort();
100     } catch (...) {
101       LOG(ERROR) << "runInEventBaseThread() function threw an exception";
102       abort();
103     }
104   }
105 };
106
107 /*
108  * EventBase::CobTimeout methods
109  */
110
111 void EventBase::CobTimeout::timeoutExpired() noexcept {
112   // For now, we just swallow any exceptions that the callback threw.
113   try {
114     cob_();
115   } catch (const std::exception& ex) {
116     LOG(ERROR) << "EventBase::runAfterDelay() callback threw "
117                << typeid(ex).name() << " exception: " << ex.what();
118   } catch (...) {
119     LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception "
120                << "type";
121   }
122
123   // The CobTimeout object was allocated on the heap by runAfterDelay(),
124   // so delete it now that the it has fired.
125   delete this;
126 }
127
128
129 // The interface used to libevent is not thread-safe.  Calls to
130 // event_init() and event_base_free() directly modify an internal
131 // global 'current_base', so a mutex is required to protect this.
132 //
133 // event_init() should only ever be called once.  Subsequent calls
134 // should be made to event_base_new().  We can recognise that
135 // event_init() has already been called by simply inspecting current_base.
136 static std::mutex libevent_mutex_;
137
138 /*
139  * EventBase methods
140  */
141
142 EventBase::EventBase()
143   : runOnceCallbacks_(nullptr)
144   , stop_(false)
145   , loopThread_(0)
146   , queue_(nullptr)
147   , fnRunner_(nullptr)
148   , maxLatency_(0)
149   , avgLoopTime_(2000000)
150   , maxLatencyLoopTime_(avgLoopTime_)
151   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
152   , latestLoopCnt_(nextLoopCnt_)
153   , startWork_(0)
154   , observer_(nullptr)
155   , observerSampleCount_(0) {
156   {
157     std::lock_guard<std::mutex> lock(libevent_mutex_);
158
159     // The value 'current_base' (libevent 1) or
160     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
161     // allowing examination of its value without an explicit reference here.
162     // If ev.ev_base is NULL, then event_init() must be called, otherwise
163     // call event_base_new().
164     struct event ev;
165     event_set(&ev, 0, 0, nullptr, nullptr);
166     evb_ = (ev.ev_base) ? event_base_new() : event_init();
167   }
168   if (UNLIKELY(evb_ == nullptr)) {
169     LOG(ERROR) << "EventBase(): Failed to init event base.";
170     folly::throwSystemError("error in EventBase::EventBase()");
171   }
172   VLOG(5) << "EventBase(): Created.";
173   initNotificationQueue();
174   RequestContext::getStaticContext();
175 }
176
177 // takes ownership of the event_base
178 EventBase::EventBase(event_base* evb)
179   : runOnceCallbacks_(nullptr)
180   , stop_(false)
181   , loopThread_(0)
182   , evb_(evb)
183   , queue_(nullptr)
184   , fnRunner_(nullptr)
185   , maxLatency_(0)
186   , avgLoopTime_(2000000)
187   , maxLatencyLoopTime_(avgLoopTime_)
188   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
189   , latestLoopCnt_(nextLoopCnt_)
190   , startWork_(0)
191   , observer_(nullptr)
192   , observerSampleCount_(0) {
193   if (UNLIKELY(evb_ == nullptr)) {
194     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
195     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
196   }
197   initNotificationQueue();
198   RequestContext::getStaticContext();
199 }
200
201 EventBase::~EventBase() {
202   // Call all destruction callbacks, before we start cleaning up our state.
203   while (!onDestructionCallbacks_.empty()) {
204     LoopCallback* callback = &onDestructionCallbacks_.front();
205     onDestructionCallbacks_.pop_front();
206     callback->runLoopCallback();
207   }
208
209   // Delete any unfired callback objects, so that we don't leak memory
210   // (Note that we don't fire them.  The caller is responsible for cleaning up
211   // its own data structures if it destroys the EventBase with unfired events
212   // remaining.)
213   while (!pendingCobTimeouts_.empty()) {
214     CobTimeout* timeout = &pendingCobTimeouts_.front();
215     delete timeout;
216   }
217
218   while (!runBeforeLoopCallbacks_.empty()) {
219     delete &runBeforeLoopCallbacks_.front();
220   }
221
222   (void) runLoopCallbacks(false);
223
224   if (!fnRunner_->consumeUntilDrained()) {
225     LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
226   }
227
228   // Stop consumer before deleting NotificationQueue
229   fnRunner_->stopConsuming();
230   {
231     std::lock_guard<std::mutex> lock(libevent_mutex_);
232     event_base_free(evb_);
233   }
234   VLOG(5) << "EventBase(): Destroyed.";
235 }
236
237 int EventBase::getNotificationQueueSize() const {
238   return queue_->size();
239 }
240
241 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
242   fnRunner_->setMaxReadAtOnce(maxAtOnce);
243 }
244
245 // Set smoothing coefficient for loop load average; input is # of milliseconds
246 // for exp(-1) decay.
247 void EventBase::setLoadAvgMsec(uint32_t ms) {
248   uint64_t us = 1000 * ms;
249   if (ms > 0) {
250     maxLatencyLoopTime_.setTimeInterval(us);
251     avgLoopTime_.setTimeInterval(us);
252   } else {
253     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
254   }
255 }
256
257 void EventBase::resetLoadAvg(double value) {
258   avgLoopTime_.reset(value);
259   maxLatencyLoopTime_.reset(value);
260 }
261
262 static std::chrono::milliseconds
263 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
264   auto result = std::chrono::steady_clock::now() - *prev;
265   *prev = std::chrono::steady_clock::now();
266
267   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
268 }
269
270 void EventBase::waitUntilRunning() {
271   while (!isRunning()) {
272     sched_yield();
273   }
274 }
275
276 // enters the event_base loop -- will only exit when forced to
277 bool EventBase::loop() {
278   return loopBody();
279 }
280
281 bool EventBase::loopOnce(int flags) {
282   return loopBody(flags | EVLOOP_ONCE);
283 }
284
285 bool EventBase::loopBody(int flags) {
286   VLOG(5) << "EventBase(): Starting loop.";
287   int res = 0;
288   bool ranLoopCallbacks;
289   bool blocking = !(flags & EVLOOP_NONBLOCK);
290   bool once = (flags & EVLOOP_ONCE);
291
292   loopThread_.store(pthread_self(), std::memory_order_release);
293
294   if (!name_.empty()) {
295     setThreadName(name_);
296   }
297
298   auto prev = std::chrono::steady_clock::now();
299   int64_t idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
300     std::chrono::steady_clock::now().time_since_epoch()).count();
301
302   // TODO: Read stop_ atomically with an acquire barrier.
303   while (!stop_) {
304     ++nextLoopCnt_;
305
306     // Run the before loop callbacks
307     LoopCallbackList callbacks;
308     callbacks.swap(runBeforeLoopCallbacks_);
309
310     while(!callbacks.empty()) {
311       auto* item = &callbacks.front();
312       callbacks.pop_front();
313       item->runLoopCallback();
314     }
315
316     // nobody can add loop callbacks from within this thread if
317     // we don't have to handle anything to start with...
318     if (blocking && loopCallbacks_.empty()) {
319       res = event_base_loop(evb_, EVLOOP_ONCE);
320     } else {
321       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
322     }
323
324     ranLoopCallbacks = runLoopCallbacks();
325
326     int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
327       std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_;
328     int64_t idle = startWork_ - idleStart;
329
330     avgLoopTime_.addSample(idle, busy);
331     maxLatencyLoopTime_.addSample(idle, busy);
332
333     if (observer_) {
334       if (observerSampleCount_++ == observer_->getSampleRate()) {
335         observerSampleCount_ = 0;
336         observer_->loopSample(busy, idle);
337       }
338     }
339
340     VLOG(11) << "EventBase " << this         << " did not timeout "
341      " loop time guess: "    << busy + idle  <<
342      " idle time: "          << idle         <<
343      " busy time: "          << busy         <<
344      " avgLoopTime: "        << avgLoopTime_.get() <<
345      " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
346      " maxLatency_: "        << maxLatency_ <<
347      " nothingHandledYet(): "<< nothingHandledYet();
348
349     // see if our average loop time has exceeded our limit
350     if ((maxLatency_ > 0) &&
351         (maxLatencyLoopTime_.get() > double(maxLatency_))) {
352       maxLatencyCob_();
353       // back off temporarily -- don't keep spamming maxLatencyCob_
354       // if we're only a bit over the limit
355       maxLatencyLoopTime_.dampen(0.9);
356     }
357
358     // Our loop run did real work; reset the idle timer
359     idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
360       std::chrono::steady_clock::now().time_since_epoch()).count();
361
362     // If the event loop indicate that there were no more events, and
363     // we also didn't have any loop callbacks to run, there is nothing left to
364     // do.
365     if (res != 0 && !ranLoopCallbacks) {
366       // Since Notification Queue is marked 'internal' some events may not have
367       // run.  Run them manually if so, and continue looping.
368       //
369       if (getNotificationQueueSize() > 0) {
370         fnRunner_->handlerReady(0);
371       } else {
372         break;
373       }
374     }
375
376     VLOG(5) << "EventBase " << this << " loop time: " <<
377       getTimeDelta(&prev).count();
378
379     if (once) {
380       break;
381     }
382   }
383   // Reset stop_ so loop() can be called again
384   stop_ = false;
385
386   if (res < 0) {
387     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
388     return false;
389   } else if (res == 1) {
390     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
391   } else if (res > 1) {
392     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
393     return false;
394   }
395
396   loopThread_.store(0, std::memory_order_release);
397
398   VLOG(5) << "EventBase(): Done with loop.";
399   return true;
400 }
401
402 void EventBase::loopForever() {
403   // Update the notification queue event to treat it as a normal (non-internal)
404   // event.  The notification queue event always remains installed, and the main
405   // loop won't exit with it installed.
406   fnRunner_->stopConsuming();
407   fnRunner_->startConsuming(this, queue_.get());
408
409   bool ret = loop();
410
411   // Restore the notification queue internal flag
412   fnRunner_->stopConsuming();
413   fnRunner_->startConsumingInternal(this, queue_.get());
414
415   if (!ret) {
416     folly::throwSystemError("error in EventBase::loopForever()");
417   }
418 }
419
420 bool EventBase::bumpHandlingTime() {
421   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
422     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
423   if(nothingHandledYet()) {
424     latestLoopCnt_ = nextLoopCnt_;
425     // set the time
426     startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
427       std::chrono::steady_clock::now().time_since_epoch()).count();
428
429     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
430       " (loop) startWork_ " << startWork_;
431     return true;
432   }
433   return false;
434 }
435
436 void EventBase::terminateLoopSoon() {
437   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
438
439   // Set stop to true, so the event loop will know to exit.
440   // TODO: We should really use an atomic operation here with a release
441   // barrier.
442   stop_ = true;
443
444   // Call event_base_loopbreak() so that libevent will exit the next time
445   // around the loop.
446   event_base_loopbreak(evb_);
447
448   // If terminateLoopSoon() is called from another thread,
449   // the EventBase thread might be stuck waiting for events.
450   // In this case, it won't wake up and notice that stop_ is set until it
451   // receives another event.  Send an empty frame to the notification queue
452   // so that the event loop will wake up even if there are no other events.
453   //
454   // We don't care about the return value of trySendFrame().  If it fails
455   // this likely means the EventBase already has lots of events waiting
456   // anyway.
457   try {
458     queue_->putMessage(std::make_pair(nullptr, nullptr));
459   } catch (...) {
460     // We don't care if putMessage() fails.  This likely means
461     // the EventBase already has lots of events waiting anyway.
462   }
463 }
464
465 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
466   DCHECK(isInEventBaseThread());
467   callback->cancelLoopCallback();
468   callback->context_ = RequestContext::saveContext();
469   if (runOnceCallbacks_ != nullptr && thisIteration) {
470     runOnceCallbacks_->push_back(*callback);
471   } else {
472     loopCallbacks_.push_back(*callback);
473   }
474 }
475
476 void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
477   DCHECK(isInEventBaseThread());
478   auto wrapper = new FunctionLoopCallback<Cob>(cob);
479   wrapper->context_ = RequestContext::saveContext();
480   if (runOnceCallbacks_ != nullptr && thisIteration) {
481     runOnceCallbacks_->push_back(*wrapper);
482   } else {
483     loopCallbacks_.push_back(*wrapper);
484   }
485 }
486
487 void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
488   DCHECK(isInEventBaseThread());
489   auto wrapper = new FunctionLoopCallback<Cob>(std::move(cob));
490   wrapper->context_ = RequestContext::saveContext();
491   if (runOnceCallbacks_ != nullptr && thisIteration) {
492     runOnceCallbacks_->push_back(*wrapper);
493   } else {
494     loopCallbacks_.push_back(*wrapper);
495   }
496 }
497
498 void EventBase::runOnDestruction(LoopCallback* callback) {
499   DCHECK(isInEventBaseThread());
500   callback->cancelLoopCallback();
501   onDestructionCallbacks_.push_back(*callback);
502 }
503
504 void EventBase::runBeforeLoop(LoopCallback* callback) {
505   DCHECK(isInEventBaseThread());
506   callback->cancelLoopCallback();
507   runBeforeLoopCallbacks_.push_back(*callback);
508 }
509
510 bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
511   // Send the message.
512   // It will be received by the FunctionRunner in the EventBase's thread.
513
514   // We try not to schedule nullptr callbacks
515   if (!fn) {
516     LOG(ERROR) << "EventBase " << this
517                << ": Scheduling nullptr callbacks is not allowed";
518     return false;
519   }
520
521   // Short-circuit if we are already in our event base
522   if (inRunningEventBaseThread()) {
523     runInLoop(new RunInLoopCallback(fn, arg));
524     return true;
525
526   }
527
528   try {
529     queue_->putMessage(std::make_pair(fn, arg));
530   } catch (const std::exception& ex) {
531     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
532                << fn << "for EventBase thread: " << ex.what();
533     return false;
534   }
535
536   return true;
537 }
538
539 bool EventBase::runInEventBaseThread(const Cob& fn) {
540   // Short-circuit if we are already in our event base
541   if (inRunningEventBaseThread()) {
542     runInLoop(fn);
543     return true;
544   }
545
546   Cob* fnCopy;
547   // Allocate a copy of the function so we can pass it to the other thread
548   // The other thread will delete this copy once the function has been run
549   try {
550     fnCopy = new Cob(fn);
551   } catch (const std::bad_alloc& ex) {
552     LOG(ERROR) << "failed to allocate tr::function copy "
553                << "for runInEventBaseThread()";
554     return false;
555   }
556
557   if (!runInEventBaseThread(&EventBase::runFunctionPtr, fnCopy)) {
558     delete fnCopy;
559     return false;
560   }
561
562   return true;
563 }
564
565 bool EventBase::runAfterDelay(const Cob& cob,
566                                int milliseconds,
567                                TimeoutManager::InternalEnum in) {
568   CobTimeout* timeout = new CobTimeout(this, cob, in);
569   if (!timeout->scheduleTimeout(milliseconds)) {
570     delete timeout;
571     return false;
572   }
573
574   pendingCobTimeouts_.push_back(*timeout);
575   return true;
576 }
577
578 bool EventBase::runLoopCallbacks(bool setContext) {
579   if (!loopCallbacks_.empty()) {
580     bumpHandlingTime();
581     // Swap the loopCallbacks_ list with a temporary list on our stack.
582     // This way we will only run callbacks scheduled at the time
583     // runLoopCallbacks() was invoked.
584     //
585     // If any of these callbacks in turn call runInLoop() to schedule more
586     // callbacks, those new callbacks won't be run until the next iteration
587     // around the event loop.  This prevents runInLoop() callbacks from being
588     // able to start file descriptor and timeout based events.
589     LoopCallbackList currentCallbacks;
590     currentCallbacks.swap(loopCallbacks_);
591     runOnceCallbacks_ = &currentCallbacks;
592
593     while (!currentCallbacks.empty()) {
594       LoopCallback* callback = &currentCallbacks.front();
595       currentCallbacks.pop_front();
596       if (setContext) {
597         RequestContext::setContext(callback->context_);
598       }
599       callback->runLoopCallback();
600     }
601
602     runOnceCallbacks_ = nullptr;
603     return true;
604   }
605   return false;
606 }
607
608 void EventBase::initNotificationQueue() {
609   // Infinite size queue
610   queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
611
612   // We allocate fnRunner_ separately, rather than declaring it directly
613   // as a member of EventBase solely so that we don't need to include
614   // NotificationQueue.h from EventBase.h
615   fnRunner_.reset(new FunctionRunner());
616
617   // Mark this as an internal event, so event_base_loop() will return if
618   // there are no other events besides this one installed.
619   //
620   // Most callers don't care about the internal notification queue used by
621   // EventBase.  The queue is always installed, so if we did count the queue as
622   // an active event, loop() would never exit with no more events to process.
623   // Users can use loopForever() if they do care about the notification queue.
624   // (This is useful for EventBase threads that do nothing but process
625   // runInEventBaseThread() notifications.)
626   fnRunner_->startConsumingInternal(this, queue_.get());
627 }
628
629 void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) {
630   expCoeff_ = -1.0/timeInterval;
631   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
632 }
633
634 void EventBase::SmoothLoopTime::reset(double value) {
635   value_ = value;
636 }
637
638 void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) {
639     /*
640      * Position at which the busy sample is considered to be taken.
641      * (Allows to quickly skew our average without editing much code)
642      */
643     enum BusySamplePosition {
644       RIGHT = 0,  // busy sample placed at the end of the iteration
645       CENTER = 1, // busy sample placed at the middle point of the iteration
646       LEFT = 2,   // busy sample placed at the beginning of the iteration
647     };
648
649   VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ <<
650               " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ <<
651               " busy " << busy << " " << __PRETTY_FUNCTION__;
652   idle += oldBusyLeftover_ + busy;
653   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
654   idle -= oldBusyLeftover_;
655
656   double coeff = exp(idle * expCoeff_);
657   value_ *= coeff;
658   value_ += (1.0 - coeff) * busy;
659 }
660
661 bool EventBase::nothingHandledYet() {
662   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
663   return (nextLoopCnt_ != latestLoopCnt_);
664 }
665
666 /* static */
667 void EventBase::runFunctionPtr(Cob* fn) {
668   // The function should never throw an exception, because we have no
669   // way of knowing what sort of error handling to perform.
670   //
671   // If it does throw, log a message and abort the program.
672   try {
673     (*fn)();
674   } catch (const std::exception &ex) {
675     LOG(ERROR) << "runInEventBaseThread() std::function threw a "
676                << typeid(ex).name() << " exception: " << ex.what();
677     abort();
678   } catch (...) {
679     LOG(ERROR) << "runInEventBaseThread() std::function threw an exception";
680     abort();
681   }
682
683   // The function object was allocated by runInEventBaseThread().
684   // Delete it once it has been run.
685   delete fn;
686 }
687
688 EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
689     : fn_(fn)
690     , arg_(arg) {}
691
692 void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
693   fn_(arg_);
694   delete this;
695 }
696
697 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
698                                       InternalEnum internal) {
699
700   struct event* ev = obj->getEvent();
701   assert(ev->ev_base == nullptr);
702
703   event_base_set(getLibeventBase(), ev);
704   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
705     // Set the EVLIST_INTERNAL flag
706     ev->ev_flags |= EVLIST_INTERNAL;
707   }
708 }
709
710 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
711   cancelTimeout(obj);
712   struct event* ev = obj->getEvent();
713   ev->ev_base = nullptr;
714 }
715
716 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
717                                  std::chrono::milliseconds timeout) {
718   assert(isInEventBaseThread());
719   // Set up the timeval and add the event
720   struct timeval tv;
721   tv.tv_sec = timeout.count() / 1000LL;
722   tv.tv_usec = (timeout.count() % 1000LL) * 1000LL;
723
724   struct event* ev = obj->getEvent();
725   if (event_add(ev, &tv) < 0) {
726     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
727     return false;
728   }
729
730   return true;
731 }
732
733 void EventBase::cancelTimeout(AsyncTimeout* obj) {
734   assert(isInEventBaseThread());
735   struct event* ev = obj->getEvent();
736   if (EventUtil::isEventRegistered(ev)) {
737     event_del(ev);
738   }
739 }
740
741 void EventBase::setName(const std::string& name) {
742   assert(isInEventBaseThread());
743   name_ = name;
744
745   if (isRunning()) {
746     setThreadName(loopThread_.load(std::memory_order_relaxed),
747                   name_);
748   }
749 }
750
751 const std::string& EventBase::getName() {
752   assert(isInEventBaseThread());
753   return name_;
754 }
755
756 const char* EventBase::getLibeventVersion() { return event_get_version(); }
757 const char* EventBase::getLibeventMethod() { return event_get_method(); }
758
759 } // folly