use HHWheelTimer for EventBase::runAfterDelay
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include <folly/io/async/EventBase.h>
22
23 #include <folly/ThreadName.h>
24 #include <folly/io/async/NotificationQueue.h>
25
26 #include <boost/static_assert.hpp>
27 #include <condition_variable>
28 #include <fcntl.h>
29 #include <mutex>
30 #include <pthread.h>
31 #include <unistd.h>
32
33 namespace {
34
35 using folly::Cob;
36 using folly::EventBase;
37
38 template <typename Callback>
39 class FunctionLoopCallback : public EventBase::LoopCallback {
40  public:
41   explicit FunctionLoopCallback(Cob&& function)
42       : function_(std::move(function)) {}
43
44   explicit FunctionLoopCallback(const Cob& function)
45       : function_(function) {}
46
47   void runLoopCallback() noexcept override {
48     function_();
49     delete this;
50   }
51
52  private:
53   Callback function_;
54 };
55
56 }
57
58 namespace folly {
59
60 const int kNoFD = -1;
61
62 /*
63  * EventBase::FunctionRunner
64  */
65
66 class EventBase::FunctionRunner
67     : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
68  public:
69   void messageAvailable(std::pair<void (*)(void*), void*>&& msg) override {
70
71     // In libevent2, internal events do not break the loop.
72     // Most users would expect loop(), followed by runInEventBaseThread(),
73     // to break the loop and check if it should exit or not.
74     // To have similar bejaviour to libevent1.4, tell the loop to break here.
75     // Note that loop() may still continue to loop, but it will also check the
76     // stop_ flag as well as runInLoop callbacks, etc.
77     event_base_loopbreak(getEventBase()->evb_);
78
79     if (msg.first == nullptr && msg.second == nullptr) {
80       // terminateLoopSoon() sends a null message just to
81       // wake up the loop.  We can ignore these messages.
82       return;
83     }
84
85     // If function is nullptr, just log and move on
86     if (!msg.first) {
87       LOG(ERROR) << "nullptr callback registered to be run in "
88                  << "event base thread";
89       return;
90     }
91
92     // The function should never throw an exception, because we have no
93     // way of knowing what sort of error handling to perform.
94     //
95     // If it does throw, log a message and abort the program.
96     try {
97       msg.first(msg.second);
98     } catch (const std::exception& ex) {
99       LOG(ERROR) << "runInEventBaseThread() function threw a "
100                  << typeid(ex).name() << " exception: " << ex.what();
101       abort();
102     } catch (...) {
103       LOG(ERROR) << "runInEventBaseThread() function threw an exception";
104       abort();
105     }
106   }
107 };
108
109 /*
110  * EventBase::CobTimeout methods
111  */
112
113 void EventBase::CobTimeout::timeoutExpired() noexcept {
114   // For now, we just swallow any exceptions that the callback threw.
115   try {
116     cob_();
117   } catch (const std::exception& ex) {
118     LOG(ERROR) << "EventBase::runAfterDelay() callback threw "
119                << typeid(ex).name() << " exception: " << ex.what();
120   } catch (...) {
121     LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception "
122                << "type";
123   }
124
125   // The CobTimeout object was allocated on the heap by runAfterDelay(),
126   // so delete it now that it has fired.
127   delete this;
128 }
129
130 void EventBase::CobTimeout::callbackCanceled() noexcept {
131   // The CobTimeout object was allocated on the heap by runAfterDelay(),
132   // so delete it now that it has been canceled.
133   delete this;
134 }
135
136
137 // The interface used to libevent is not thread-safe.  Calls to
138 // event_init() and event_base_free() directly modify an internal
139 // global 'current_base', so a mutex is required to protect this.
140 //
141 // event_init() should only ever be called once.  Subsequent calls
142 // should be made to event_base_new().  We can recognise that
143 // event_init() has already been called by simply inspecting current_base.
144 static std::mutex libevent_mutex_;
145
146 /*
147  * EventBase methods
148  */
149
150 EventBase::EventBase(bool enableTimeMeasurement)
151   : runOnceCallbacks_(nullptr)
152   , stop_(false)
153   , loopThread_(0)
154   , queue_(nullptr)
155   , fnRunner_(nullptr)
156   , maxLatency_(0)
157   , avgLoopTime_(2000000)
158   , maxLatencyLoopTime_(avgLoopTime_)
159   , enableTimeMeasurement_(enableTimeMeasurement)
160   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
161   , latestLoopCnt_(nextLoopCnt_)
162   , startWork_(0)
163   , observer_(nullptr)
164   , observerSampleCount_(0)
165   , executionObserver_(nullptr) {
166   {
167     std::lock_guard<std::mutex> lock(libevent_mutex_);
168
169     // The value 'current_base' (libevent 1) or
170     // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
171     // allowing examination of its value without an explicit reference here.
172     // If ev.ev_base is NULL, then event_init() must be called, otherwise
173     // call event_base_new().
174     struct event ev;
175     event_set(&ev, 0, 0, nullptr, nullptr);
176     evb_ = (ev.ev_base) ? event_base_new() : event_init();
177   }
178   if (UNLIKELY(evb_ == nullptr)) {
179     LOG(ERROR) << "EventBase(): Failed to init event base.";
180     folly::throwSystemError("error in EventBase::EventBase()");
181   }
182   VLOG(5) << "EventBase(): Created.";
183   initNotificationQueue();
184   wheelTimer_ = HHWheelTimer::UniquePtr(new HHWheelTimer(this));
185   RequestContext::saveContext();
186 }
187
188 // takes ownership of the event_base
189 EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
190   : runOnceCallbacks_(nullptr)
191   , stop_(false)
192   , loopThread_(0)
193   , evb_(evb)
194   , queue_(nullptr)
195   , fnRunner_(nullptr)
196   , maxLatency_(0)
197   , avgLoopTime_(2000000)
198   , maxLatencyLoopTime_(avgLoopTime_)
199   , enableTimeMeasurement_(enableTimeMeasurement)
200   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
201   , latestLoopCnt_(nextLoopCnt_)
202   , startWork_(0)
203   , observer_(nullptr)
204   , observerSampleCount_(0)
205   , executionObserver_(nullptr) {
206   if (UNLIKELY(evb_ == nullptr)) {
207     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
208     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
209   }
210   initNotificationQueue();
211   wheelTimer_ = HHWheelTimer::UniquePtr(new HHWheelTimer(this));
212   RequestContext::saveContext();
213 }
214
215 EventBase::~EventBase() {
216   // Call all destruction callbacks, before we start cleaning up our state.
217   while (!onDestructionCallbacks_.empty()) {
218     LoopCallback* callback = &onDestructionCallbacks_.front();
219     onDestructionCallbacks_.pop_front();
220     callback->runLoopCallback();
221   }
222
223   // Delete any unfired callback objects, so that we don't leak memory
224   // (Note that we don't fire them.  The caller is responsible for cleaning up
225   // its own data structures if it destroys the EventBase with unfired events
226   // remaining.)
227   wheelTimer_->cancelAll();
228
229   while (!runBeforeLoopCallbacks_.empty()) {
230     delete &runBeforeLoopCallbacks_.front();
231   }
232
233   (void) runLoopCallbacks(false);
234
235   if (!fnRunner_->consumeUntilDrained()) {
236     LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
237   }
238
239   // Stop consumer before deleting NotificationQueue
240   fnRunner_->stopConsuming();
241   {
242     std::lock_guard<std::mutex> lock(libevent_mutex_);
243     event_base_free(evb_);
244   }
245
246   while (!runAfterDrainCallbacks_.empty()) {
247     LoopCallback* callback = &runAfterDrainCallbacks_.front();
248     runAfterDrainCallbacks_.pop_front();
249     callback->runLoopCallback();
250   }
251
252   {
253     std::lock_guard<std::mutex> lock(localStorageMutex_);
254     for (auto storage : localStorageToDtor_) {
255       storage->onEventBaseDestruction(*this);
256     }
257   }
258   VLOG(5) << "EventBase(): Destroyed.";
259 }
260
261 int EventBase::getNotificationQueueSize() const {
262   return queue_->size();
263 }
264
265 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
266   fnRunner_->setMaxReadAtOnce(maxAtOnce);
267 }
268
269 // Set smoothing coefficient for loop load average; input is # of milliseconds
270 // for exp(-1) decay.
271 void EventBase::setLoadAvgMsec(uint32_t ms) {
272   assert(enableTimeMeasurement_);
273   uint64_t us = 1000 * ms;
274   if (ms > 0) {
275     maxLatencyLoopTime_.setTimeInterval(us);
276     avgLoopTime_.setTimeInterval(us);
277   } else {
278     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
279   }
280 }
281
282 void EventBase::resetLoadAvg(double value) {
283   assert(enableTimeMeasurement_);
284   avgLoopTime_.reset(value);
285   maxLatencyLoopTime_.reset(value);
286 }
287
288 static std::chrono::milliseconds
289 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
290   auto result = std::chrono::steady_clock::now() - *prev;
291   *prev = std::chrono::steady_clock::now();
292
293   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
294 }
295
296 void EventBase::waitUntilRunning() {
297   while (!isRunning()) {
298     sched_yield();
299   }
300 }
301
302 // enters the event_base loop -- will only exit when forced to
303 bool EventBase::loop() {
304   return loopBody();
305 }
306
307 bool EventBase::loopOnce(int flags) {
308   return loopBody(flags | EVLOOP_ONCE);
309 }
310
311 bool EventBase::loopBody(int flags) {
312   VLOG(5) << "EventBase(): Starting loop.";
313   int res = 0;
314   bool ranLoopCallbacks;
315   bool blocking = !(flags & EVLOOP_NONBLOCK);
316   bool once = (flags & EVLOOP_ONCE);
317
318   // time-measurement variables.
319   std::chrono::steady_clock::time_point prev;
320   int64_t idleStart;
321   int64_t busy;
322   int64_t idle;
323
324   loopThread_.store(pthread_self(), std::memory_order_release);
325
326   if (!name_.empty()) {
327     setThreadName(name_);
328   }
329
330   if (enableTimeMeasurement_) {
331     prev = std::chrono::steady_clock::now();
332     idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
333       std::chrono::steady_clock::now().time_since_epoch()).count();
334   }
335
336   while (!stop_.load(std::memory_order_acquire)) {
337     ++nextLoopCnt_;
338
339     // Run the before loop callbacks
340     LoopCallbackList callbacks;
341     callbacks.swap(runBeforeLoopCallbacks_);
342
343     while(!callbacks.empty()) {
344       auto* item = &callbacks.front();
345       callbacks.pop_front();
346       item->runLoopCallback();
347     }
348
349     // nobody can add loop callbacks from within this thread if
350     // we don't have to handle anything to start with...
351     if (blocking && loopCallbacks_.empty()) {
352       res = event_base_loop(evb_, EVLOOP_ONCE);
353     } else {
354       res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
355     }
356
357     ranLoopCallbacks = runLoopCallbacks();
358
359     if (enableTimeMeasurement_) {
360       busy = std::chrono::duration_cast<std::chrono::microseconds>(
361         std::chrono::steady_clock::now().time_since_epoch()).count() -
362         startWork_;
363       idle = startWork_ - idleStart;
364
365       avgLoopTime_.addSample(idle, busy);
366       maxLatencyLoopTime_.addSample(idle, busy);
367
368       if (observer_) {
369         if (observerSampleCount_++ == observer_->getSampleRate()) {
370           observerSampleCount_ = 0;
371           observer_->loopSample(busy, idle);
372         }
373       }
374
375       VLOG(11) << "EventBase "  << this         << " did not timeout "
376         " loop time guess: "    << busy + idle  <<
377         " idle time: "          << idle         <<
378         " busy time: "          << busy         <<
379         " avgLoopTime: "        << avgLoopTime_.get() <<
380         " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
381         " maxLatency_: "        << maxLatency_ <<
382         " nothingHandledYet(): "<< nothingHandledYet();
383
384       // see if our average loop time has exceeded our limit
385       if ((maxLatency_ > 0) &&
386           (maxLatencyLoopTime_.get() > double(maxLatency_))) {
387         maxLatencyCob_();
388         // back off temporarily -- don't keep spamming maxLatencyCob_
389         // if we're only a bit over the limit
390         maxLatencyLoopTime_.dampen(0.9);
391       }
392
393       // Our loop run did real work; reset the idle timer
394       idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
395         std::chrono::steady_clock::now().time_since_epoch()).count();
396     } else {
397       VLOG(11) << "EventBase "  << this << " did not timeout "
398         " time measurement is disabled "
399         " nothingHandledYet(): "<< nothingHandledYet();
400     }
401
402     // If the event loop indicate that there were no more events, and
403     // we also didn't have any loop callbacks to run, there is nothing left to
404     // do.
405     if (res != 0 && !ranLoopCallbacks) {
406       // Since Notification Queue is marked 'internal' some events may not have
407       // run.  Run them manually if so, and continue looping.
408       //
409       if (getNotificationQueueSize() > 0) {
410         fnRunner_->handlerReady(0);
411       } else {
412         break;
413       }
414     }
415
416     if (enableTimeMeasurement_) {
417       VLOG(5) << "EventBase " << this << " loop time: " <<
418         getTimeDelta(&prev).count();
419     }
420
421     if (once) {
422       break;
423     }
424   }
425   // Reset stop_ so loop() can be called again
426   stop_ = false;
427
428   if (res < 0) {
429     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
430     return false;
431   } else if (res == 1) {
432     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
433   } else if (res > 1) {
434     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
435     return false;
436   }
437
438   loopThread_.store(0, std::memory_order_release);
439
440   VLOG(5) << "EventBase(): Done with loop.";
441   return true;
442 }
443
444 void EventBase::loopForever() {
445   // Update the notification queue event to treat it as a normal (non-internal)
446   // event.  The notification queue event always remains installed, and the main
447   // loop won't exit with it installed.
448   fnRunner_->stopConsuming();
449   fnRunner_->startConsuming(this, queue_.get());
450
451   bool ret = loop();
452
453   // Restore the notification queue internal flag
454   fnRunner_->stopConsuming();
455   fnRunner_->startConsumingInternal(this, queue_.get());
456
457   if (!ret) {
458     folly::throwSystemError("error in EventBase::loopForever()");
459   }
460 }
461
462 bool EventBase::bumpHandlingTime() {
463   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
464     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
465   if(nothingHandledYet()) {
466     latestLoopCnt_ = nextLoopCnt_;
467     // set the time
468     startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
469       std::chrono::steady_clock::now().time_since_epoch()).count();
470
471     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
472       " (loop) startWork_ " << startWork_;
473     return true;
474   }
475   return false;
476 }
477
478 void EventBase::terminateLoopSoon() {
479   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
480
481   // Set stop to true, so the event loop will know to exit.
482   // TODO: We should really use an atomic operation here with a release
483   // barrier.
484   stop_ = true;
485
486   // Call event_base_loopbreak() so that libevent will exit the next time
487   // around the loop.
488   event_base_loopbreak(evb_);
489
490   // If terminateLoopSoon() is called from another thread,
491   // the EventBase thread might be stuck waiting for events.
492   // In this case, it won't wake up and notice that stop_ is set until it
493   // receives another event.  Send an empty frame to the notification queue
494   // so that the event loop will wake up even if there are no other events.
495   //
496   // We don't care about the return value of trySendFrame().  If it fails
497   // this likely means the EventBase already has lots of events waiting
498   // anyway.
499   try {
500     queue_->putMessage(std::make_pair(nullptr, nullptr));
501   } catch (...) {
502     // We don't care if putMessage() fails.  This likely means
503     // the EventBase already has lots of events waiting anyway.
504   }
505 }
506
507 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
508   DCHECK(isInEventBaseThread());
509   callback->cancelLoopCallback();
510   callback->context_ = RequestContext::saveContext();
511   if (runOnceCallbacks_ != nullptr && thisIteration) {
512     runOnceCallbacks_->push_back(*callback);
513   } else {
514     loopCallbacks_.push_back(*callback);
515   }
516 }
517
518 void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
519   DCHECK(isInEventBaseThread());
520   auto wrapper = new FunctionLoopCallback<Cob>(cob);
521   wrapper->context_ = RequestContext::saveContext();
522   if (runOnceCallbacks_ != nullptr && thisIteration) {
523     runOnceCallbacks_->push_back(*wrapper);
524   } else {
525     loopCallbacks_.push_back(*wrapper);
526   }
527 }
528
529 void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
530   DCHECK(isInEventBaseThread());
531   auto wrapper = new FunctionLoopCallback<Cob>(std::move(cob));
532   wrapper->context_ = RequestContext::saveContext();
533   if (runOnceCallbacks_ != nullptr && thisIteration) {
534     runOnceCallbacks_->push_back(*wrapper);
535   } else {
536     loopCallbacks_.push_back(*wrapper);
537   }
538 }
539
540 void EventBase::runAfterDrain(Cob&& cob) {
541   auto callback = new FunctionLoopCallback<Cob>(std::move(cob));
542   std::lock_guard<std::mutex> lg(runAfterDrainCallbacksMutex_);
543   callback->cancelLoopCallback();
544   runAfterDrainCallbacks_.push_back(*callback);
545 }
546
547 void EventBase::runOnDestruction(LoopCallback* callback) {
548   std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
549   callback->cancelLoopCallback();
550   onDestructionCallbacks_.push_back(*callback);
551 }
552
553 void EventBase::runBeforeLoop(LoopCallback* callback) {
554   DCHECK(isInEventBaseThread());
555   callback->cancelLoopCallback();
556   runBeforeLoopCallbacks_.push_back(*callback);
557 }
558
559 bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
560   // Send the message.
561   // It will be received by the FunctionRunner in the EventBase's thread.
562
563   // We try not to schedule nullptr callbacks
564   if (!fn) {
565     LOG(ERROR) << "EventBase " << this
566                << ": Scheduling nullptr callbacks is not allowed";
567     return false;
568   }
569
570   // Short-circuit if we are already in our event base
571   if (inRunningEventBaseThread()) {
572     runInLoop(new RunInLoopCallback(fn, arg));
573     return true;
574
575   }
576
577   try {
578     queue_->putMessage(std::make_pair(fn, arg));
579   } catch (const std::exception& ex) {
580     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
581                << fn << "for EventBase thread: " << ex.what();
582     return false;
583   }
584
585   return true;
586 }
587
588 bool EventBase::runInEventBaseThread(const Cob& fn) {
589   // Short-circuit if we are already in our event base
590   if (inRunningEventBaseThread()) {
591     runInLoop(fn);
592     return true;
593   }
594
595   Cob* fnCopy;
596   // Allocate a copy of the function so we can pass it to the other thread
597   // The other thread will delete this copy once the function has been run
598   try {
599     fnCopy = new Cob(fn);
600   } catch (const std::bad_alloc& ex) {
601     LOG(ERROR) << "failed to allocate tr::function copy "
602                << "for runInEventBaseThread()";
603     return false;
604   }
605
606   if (!runInEventBaseThread(&EventBase::runFunctionPtr, fnCopy)) {
607     delete fnCopy;
608     return false;
609   }
610
611   return true;
612 }
613
614 bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) {
615   if (inRunningEventBaseThread()) {
616     LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
617                << "allowed";
618     return false;
619   }
620
621   bool ready = false;
622   std::mutex m;
623   std::condition_variable cv;
624   runInEventBaseThread([&] {
625       SCOPE_EXIT {
626         std::unique_lock<std::mutex> l(m);
627         ready = true;
628         cv.notify_one();
629         // We cannot release the lock before notify_one, because a spurious
630         // wakeup in the waiting thread may lead to cv and m going out of scope
631         // prematurely.
632       };
633       fn();
634   });
635   std::unique_lock<std::mutex> l(m);
636   cv.wait(l, [&] { return ready; });
637
638   return true;
639 }
640
641 bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(const Cob& fn) {
642   if (isInEventBaseThread()) {
643     fn();
644     return true;
645   } else {
646     return runInEventBaseThreadAndWait(fn);
647   }
648 }
649
650 void EventBase::runAfterDelay(const Cob& cob,
651                               int milliseconds,
652                               TimeoutManager::InternalEnum in) {
653   if (!tryRunAfterDelay(cob, milliseconds, in)) {
654     folly::throwSystemError(
655       "error in EventBase::runAfterDelay(), failed to schedule timeout");
656   }
657 }
658
659 bool EventBase::tryRunAfterDelay(const Cob& cob,
660                                  int milliseconds,
661                                  TimeoutManager::InternalEnum in) {
662   // A previous implementation could fail, and the API is retained for
663   // backwards compatibility.
664   wheelTimer_->scheduleTimeout(
665       new CobTimeout(cob),
666       std::chrono::milliseconds(milliseconds));
667   return true;
668 }
669
670 bool EventBase::runLoopCallbacks(bool setContext) {
671   if (!loopCallbacks_.empty()) {
672     bumpHandlingTime();
673     // Swap the loopCallbacks_ list with a temporary list on our stack.
674     // This way we will only run callbacks scheduled at the time
675     // runLoopCallbacks() was invoked.
676     //
677     // If any of these callbacks in turn call runInLoop() to schedule more
678     // callbacks, those new callbacks won't be run until the next iteration
679     // around the event loop.  This prevents runInLoop() callbacks from being
680     // able to start file descriptor and timeout based events.
681     LoopCallbackList currentCallbacks;
682     currentCallbacks.swap(loopCallbacks_);
683     runOnceCallbacks_ = &currentCallbacks;
684
685     while (!currentCallbacks.empty()) {
686       LoopCallback* callback = &currentCallbacks.front();
687       currentCallbacks.pop_front();
688       if (setContext) {
689         RequestContext::setContext(callback->context_);
690       }
691       callback->runLoopCallback();
692     }
693
694     runOnceCallbacks_ = nullptr;
695     return true;
696   }
697   return false;
698 }
699
700 void EventBase::initNotificationQueue() {
701   // Infinite size queue
702   queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
703
704   // We allocate fnRunner_ separately, rather than declaring it directly
705   // as a member of EventBase solely so that we don't need to include
706   // NotificationQueue.h from EventBase.h
707   fnRunner_.reset(new FunctionRunner());
708
709   // Mark this as an internal event, so event_base_loop() will return if
710   // there are no other events besides this one installed.
711   //
712   // Most callers don't care about the internal notification queue used by
713   // EventBase.  The queue is always installed, so if we did count the queue as
714   // an active event, loop() would never exit with no more events to process.
715   // Users can use loopForever() if they do care about the notification queue.
716   // (This is useful for EventBase threads that do nothing but process
717   // runInEventBaseThread() notifications.)
718   fnRunner_->startConsumingInternal(this, queue_.get());
719 }
720
721 void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) {
722   expCoeff_ = -1.0/timeInterval;
723   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
724 }
725
726 void EventBase::SmoothLoopTime::reset(double value) {
727   value_ = value;
728 }
729
730 void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) {
731     /*
732      * Position at which the busy sample is considered to be taken.
733      * (Allows to quickly skew our average without editing much code)
734      */
735     enum BusySamplePosition {
736       RIGHT = 0,  // busy sample placed at the end of the iteration
737       CENTER = 1, // busy sample placed at the middle point of the iteration
738       LEFT = 2,   // busy sample placed at the beginning of the iteration
739     };
740
741   // See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
742   // and D676020 for more info on this calculation.
743   VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ <<
744               " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ <<
745               " busy " << busy << " " << __PRETTY_FUNCTION__;
746   idle += oldBusyLeftover_ + busy;
747   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
748   idle -= oldBusyLeftover_;
749
750   double coeff = exp(idle * expCoeff_);
751   value_ *= coeff;
752   value_ += (1.0 - coeff) * busy;
753 }
754
755 bool EventBase::nothingHandledYet() {
756   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
757   return (nextLoopCnt_ != latestLoopCnt_);
758 }
759
760 /* static */
761 void EventBase::runFunctionPtr(Cob* fn) {
762   // The function should never throw an exception, because we have no
763   // way of knowing what sort of error handling to perform.
764   //
765   // If it does throw, log a message and abort the program.
766   try {
767     (*fn)();
768   } catch (const std::exception &ex) {
769     LOG(ERROR) << "runInEventBaseThread() std::function threw a "
770                << typeid(ex).name() << " exception: " << ex.what();
771     abort();
772   } catch (...) {
773     LOG(ERROR) << "runInEventBaseThread() std::function threw an exception";
774     abort();
775   }
776
777   // The function object was allocated by runInEventBaseThread().
778   // Delete it once it has been run.
779   delete fn;
780 }
781
782 EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
783     : fn_(fn)
784     , arg_(arg) {}
785
786 void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
787   fn_(arg_);
788   delete this;
789 }
790
791 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
792                                       InternalEnum internal) {
793
794   struct event* ev = obj->getEvent();
795   assert(ev->ev_base == nullptr);
796
797   event_base_set(getLibeventBase(), ev);
798   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
799     // Set the EVLIST_INTERNAL flag
800     ev->ev_flags |= EVLIST_INTERNAL;
801   }
802 }
803
804 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
805   cancelTimeout(obj);
806   struct event* ev = obj->getEvent();
807   ev->ev_base = nullptr;
808 }
809
810 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
811                                  TimeoutManager::timeout_type timeout) {
812   assert(isInEventBaseThread());
813   // Set up the timeval and add the event
814   struct timeval tv;
815   tv.tv_sec = timeout.count() / 1000LL;
816   tv.tv_usec = (timeout.count() % 1000LL) * 1000LL;
817
818   struct event* ev = obj->getEvent();
819   if (event_add(ev, &tv) < 0) {
820     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
821     return false;
822   }
823
824   return true;
825 }
826
827 void EventBase::cancelTimeout(AsyncTimeout* obj) {
828   assert(isInEventBaseThread());
829   struct event* ev = obj->getEvent();
830   if (EventUtil::isEventRegistered(ev)) {
831     event_del(ev);
832   }
833 }
834
835 void EventBase::setName(const std::string& name) {
836   assert(isInEventBaseThread());
837   name_ = name;
838
839   if (isRunning()) {
840     setThreadName(loopThread_.load(std::memory_order_relaxed),
841                   name_);
842   }
843 }
844
845 const std::string& EventBase::getName() {
846   assert(isInEventBaseThread());
847   return name_;
848 }
849
850 const char* EventBase::getLibeventVersion() { return event_get_version(); }
851 const char* EventBase::getLibeventMethod() { return event_get_method(); }
852
853 } // folly