130ba5fbc2b21be360ddb7b598434cf6548618a8
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include "folly/io/async/EventBase.h"
22
23 #include "folly/ThreadName.h"
24 #include "folly/io/async/NotificationQueue.h"
25
26 #include <boost/static_assert.hpp>
27 #include <fcntl.h>
28 #include <pthread.h>
29 #include <unistd.h>
30
31 namespace {
32
33 using folly::Cob;
34 using folly::EventBase;
35
36 template <typename Callback>
37 class FunctionLoopCallback : public EventBase::LoopCallback {
38  public:
39   explicit FunctionLoopCallback(Cob&& function)
40       : function_(std::move(function)) {}
41
42   explicit FunctionLoopCallback(const Cob& function)
43       : function_(function) {}
44
45   virtual void runLoopCallback() noexcept {
46     function_();
47     delete this;
48   }
49
50  private:
51   Callback function_;
52 };
53
54 }
55
56 namespace folly {
57
58 const int kNoFD = -1;
59
60 /*
61  * EventBase::FunctionRunner
62  */
63
64 class EventBase::FunctionRunner
65     : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
66  public:
67   void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
68
69     // In libevent2, internal events do not break the loop.
70     // Most users would expect loop(), followed by runInEventBaseThread(),
71     // to break the loop and check if it should exit or not.
72     // To have similar bejaviour to libevent1.4, tell the loop to break here.
73     // Note that loop() may still continue to loop, but it will also check the
74     // stop_ flag as well as runInLoop callbacks, etc.
75     event_base_loopbreak(getEventBase()->evb_);
76
77     if (msg.first == nullptr && msg.second == nullptr) {
78       // terminateLoopSoon() sends a null message just to
79       // wake up the loop.  We can ignore these messages.
80       return;
81     }
82
83     // If function is nullptr, just log and move on
84     if (!msg.first) {
85       LOG(ERROR) << "nullptr callback registered to be run in "
86                  << "event base thread";
87       return;
88     }
89
90     // The function should never throw an exception, because we have no
91     // way of knowing what sort of error handling to perform.
92     //
93     // If it does throw, log a message and abort the program.
94     try {
95       msg.first(msg.second);
96     } catch (const std::exception& ex) {
97       LOG(ERROR) << "runInEventBaseThread() function threw a "
98                  << typeid(ex).name() << " exception: " << ex.what();
99       abort();
100     } catch (...) {
101       LOG(ERROR) << "runInEventBaseThread() function threw an exception";
102       abort();
103     }
104   }
105 };
106
107 /*
108  * EventBase::CobTimeout methods
109  */
110
111 void EventBase::CobTimeout::timeoutExpired() noexcept {
112   // For now, we just swallow any exceptions that the callback threw.
113   try {
114     cob_();
115   } catch (const std::exception& ex) {
116     LOG(ERROR) << "EventBase::runAfterDelay() callback threw "
117                << typeid(ex).name() << " exception: " << ex.what();
118   } catch (...) {
119     LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception "
120                << "type";
121   }
122
123   // The CobTimeout object was allocated on the heap by runAfterDelay(),
124   // so delete it now that the it has fired.
125   delete this;
126 }
127
128 /*
129  * EventBase methods
130  */
131
132 EventBase::EventBase()
133   : runOnceCallbacks_(nullptr)
134   , stop_(false)
135   , loopThread_(0)
136   , evb_(static_cast<event_base*>(event_init()))
137   , queue_(nullptr)
138   , fnRunner_(nullptr)
139   , maxLatency_(0)
140   , avgLoopTime_(2000000)
141   , maxLatencyLoopTime_(avgLoopTime_)
142   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
143   , latestLoopCnt_(nextLoopCnt_)
144   , startWork_(0)
145   , observer_(nullptr)
146   , observerSampleCount_(0) {
147   if (UNLIKELY(evb_ == nullptr)) {
148     LOG(ERROR) << "EventBase(): Failed to init event base.";
149     folly::throwSystemError("error in EventBase::EventBase()");
150   }
151   VLOG(5) << "EventBase(): Created.";
152   initNotificationQueue();
153   RequestContext::getStaticContext();
154 }
155
156 // takes ownership of the event_base
157 EventBase::EventBase(event_base* evb)
158   : runOnceCallbacks_(nullptr)
159   , stop_(false)
160   , loopThread_(0)
161   , evb_(evb)
162   , queue_(nullptr)
163   , fnRunner_(nullptr)
164   , maxLatency_(0)
165   , avgLoopTime_(2000000)
166   , maxLatencyLoopTime_(avgLoopTime_)
167   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
168   , latestLoopCnt_(nextLoopCnt_)
169   , startWork_(0)
170   , observer_(nullptr)
171   , observerSampleCount_(0) {
172   if (UNLIKELY(evb_ == nullptr)) {
173     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
174     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
175   }
176   initNotificationQueue();
177   RequestContext::getStaticContext();
178 }
179
180 EventBase::~EventBase() {
181   // Delete any unfired CobTimeout objects, so that we don't leak memory
182   // (Note that we don't fire them.  The caller is responsible for cleaning up
183   // its own data structures if it destroys the EventBase with unfired events
184   // remaining.)
185   while (!pendingCobTimeouts_.empty()) {
186     CobTimeout* timeout = &pendingCobTimeouts_.front();
187     delete timeout;
188   }
189
190   (void) runLoopCallbacks(false);
191
192   // Stop consumer before deleting NotificationQueue
193   fnRunner_->stopConsuming();
194   event_base_free(evb_);
195   VLOG(5) << "EventBase(): Destroyed.";
196 }
197
198 int EventBase::getNotificationQueueSize() const {
199   return queue_->size();
200 }
201
202 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
203   fnRunner_->setMaxReadAtOnce(maxAtOnce);
204 }
205
206 // Set smoothing coefficient for loop load average; input is # of milliseconds
207 // for exp(-1) decay.
208 void EventBase::setLoadAvgMsec(uint32_t ms) {
209   uint64_t us = 1000 * ms;
210   if (ms > 0) {
211     maxLatencyLoopTime_.setTimeInterval(us);
212     avgLoopTime_.setTimeInterval(us);
213   } else {
214     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
215   }
216 }
217
218 void EventBase::resetLoadAvg(double value) {
219   avgLoopTime_.reset(value);
220   maxLatencyLoopTime_.reset(value);
221 }
222
223 static std::chrono::milliseconds
224 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
225   auto result = std::chrono::steady_clock::now() - *prev;
226   *prev = std::chrono::steady_clock::now();
227
228   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
229 }
230
231 void EventBase::waitUntilRunning() {
232   while (!isRunning()) {
233     sched_yield();
234   }
235 }
236
237 // enters the event_base loop -- will only exit when forced to
238 bool EventBase::loop() {
239   return loopBody();
240 }
241
242 bool EventBase::loopOnce() {
243   return loopBody(true);
244 }
245
246 bool EventBase::loopBody(bool once) {
247   VLOG(5) << "EventBase(): Starting loop.";
248   int res = 0;
249   bool ranLoopCallbacks;
250   int nonBlocking;
251
252   loopThread_.store(pthread_self(), std::memory_order_release);
253
254   if (!name_.empty()) {
255     setThreadName(name_);
256   }
257
258   auto prev = std::chrono::steady_clock::now();
259   int64_t idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
260     std::chrono::steady_clock::now().time_since_epoch()).count();
261
262   // TODO: Read stop_ atomically with an acquire barrier.
263   while (!stop_) {
264     ++nextLoopCnt_;
265
266     // nobody can add loop callbacks from within this thread if
267     // we don't have to handle anything to start with...
268     nonBlocking = (loopCallbacks_.empty() ? 0 : EVLOOP_NONBLOCK);
269     res = event_base_loop(evb_, EVLOOP_ONCE | nonBlocking);
270     ranLoopCallbacks = runLoopCallbacks();
271
272     int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
273       std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_;
274     int64_t idle = startWork_ - idleStart;
275
276     avgLoopTime_.addSample(idle, busy);
277     maxLatencyLoopTime_.addSample(idle, busy);
278
279     if (observer_) {
280       if (observerSampleCount_++ == observer_->getSampleRate()) {
281         observerSampleCount_ = 0;
282         observer_->loopSample(busy, idle);
283       }
284     }
285
286     VLOG(11) << "EventBase " << this         << " did not timeout "
287      " loop time guess: "    << busy + idle  <<
288      " idle time: "          << idle         <<
289      " busy time: "          << busy         <<
290      " avgLoopTime: "        << avgLoopTime_.get() <<
291      " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
292      " maxLatency_: "        << maxLatency_ <<
293      " nothingHandledYet(): "<< nothingHandledYet();
294
295     // see if our average loop time has exceeded our limit
296     if ((maxLatency_ > 0) &&
297         (maxLatencyLoopTime_.get() > double(maxLatency_))) {
298       maxLatencyCob_();
299       // back off temporarily -- don't keep spamming maxLatencyCob_
300       // if we're only a bit over the limit
301       maxLatencyLoopTime_.dampen(0.9);
302     }
303
304     // Our loop run did real work; reset the idle timer
305     idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
306       std::chrono::steady_clock::now().time_since_epoch()).count();
307
308     // If the event loop indicate that there were no more events, and
309     // we also didn't have any loop callbacks to run, there is nothing left to
310     // do.
311     if (res != 0 && !ranLoopCallbacks) {
312       // Since Notification Queue is marked 'internal' some events may not have
313       // run.  Run them manually if so, and continue looping.
314       //
315       if (getNotificationQueueSize() > 0) {
316         fnRunner_->handlerReady(0);
317       } else {
318         break;
319       }
320     }
321
322     VLOG(5) << "EventBase " << this << " loop time: " <<
323       getTimeDelta(&prev).count();
324
325     if (once) {
326       break;
327     }
328   }
329   // Reset stop_ so loop() can be called again
330   stop_ = false;
331
332   if (res < 0) {
333     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
334     return false;
335   } else if (res == 1) {
336     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
337   } else if (res > 1) {
338     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
339     return false;
340   }
341
342   loopThread_.store(0, std::memory_order_release);
343
344   VLOG(5) << "EventBase(): Done with loop.";
345   return true;
346 }
347
348 void EventBase::loopForever() {
349   // Update the notification queue event to treat it as a normal (non-internal)
350   // event.  The notification queue event always remains installed, and the main
351   // loop won't exit with it installed.
352   fnRunner_->stopConsuming();
353   fnRunner_->startConsuming(this, queue_.get());
354
355   bool ret = loop();
356
357   // Restore the notification queue internal flag
358   fnRunner_->stopConsuming();
359   fnRunner_->startConsumingInternal(this, queue_.get());
360
361   if (!ret) {
362     folly::throwSystemError("error in EventBase::loopForever()");
363   }
364 }
365
366 bool EventBase::bumpHandlingTime() {
367   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
368     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
369   if(nothingHandledYet()) {
370     latestLoopCnt_ = nextLoopCnt_;
371     // set the time
372     startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
373       std::chrono::steady_clock::now().time_since_epoch()).count();
374
375     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
376       " (loop) startWork_ " << startWork_;
377     return true;
378   }
379   return false;
380 }
381
382 void EventBase::terminateLoopSoon() {
383   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
384
385   if (!isRunning()) {
386     return;
387   }
388
389   // Set stop to true, so the event loop will know to exit.
390   // TODO: We should really use an atomic operation here with a release
391   // barrier.
392   stop_ = true;
393
394   // Call event_base_loopbreak() so that libevent will exit the next time
395   // around the loop.
396   event_base_loopbreak(evb_);
397
398   // If terminateLoopSoon() is called from another thread,
399   // the EventBase thread might be stuck waiting for events.
400   // In this case, it won't wake up and notice that stop_ is set until it
401   // receives another event.  Send an empty frame to the notification queue
402   // so that the event loop will wake up even if there are no other events.
403   //
404   // We don't care about the return value of trySendFrame().  If it fails
405   // this likely means the EventBase already has lots of events waiting
406   // anyway.
407   try {
408     queue_->putMessage(std::make_pair(nullptr, nullptr));
409   } catch (...) {
410     // We don't care if putMessage() fails.  This likely means
411     // the EventBase already has lots of events waiting anyway.
412   }
413 }
414
415 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
416   DCHECK(isInEventBaseThread());
417   callback->cancelLoopCallback();
418   callback->context_ = RequestContext::saveContext();
419   if (runOnceCallbacks_ != nullptr && thisIteration) {
420     runOnceCallbacks_->push_back(*callback);
421   } else {
422     loopCallbacks_.push_back(*callback);
423   }
424 }
425
426 void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
427   DCHECK(isInEventBaseThread());
428   auto wrapper = new FunctionLoopCallback<Cob>(cob);
429   wrapper->context_ = RequestContext::saveContext();
430   if (runOnceCallbacks_ != nullptr && thisIteration) {
431     runOnceCallbacks_->push_back(*wrapper);
432   } else {
433     loopCallbacks_.push_back(*wrapper);
434   }
435 }
436
437 void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
438   DCHECK(isInEventBaseThread());
439   auto wrapper = new FunctionLoopCallback<Cob>(std::move(cob));
440   wrapper->context_ = RequestContext::saveContext();
441   if (runOnceCallbacks_ != nullptr && thisIteration) {
442     runOnceCallbacks_->push_back(*wrapper);
443   } else {
444     loopCallbacks_.push_back(*wrapper);
445   }
446 }
447
448 bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
449   // Send the message.
450   // It will be received by the FunctionRunner in the EventBase's thread.
451
452   // We try not to schedule nullptr callbacks
453   if (!fn) {
454     LOG(ERROR) << "EventBase " << this
455                << ": Scheduling nullptr callbacks is not allowed";
456     return false;
457   }
458
459   // Short-circuit if we are already in our event base
460   if (inRunningEventBaseThread()) {
461     runInLoop(new RunInLoopCallback(fn, arg));
462     return true;
463
464   }
465
466   try {
467     queue_->putMessage(std::make_pair(fn, arg));
468   } catch (const std::exception& ex) {
469     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
470                << fn << "for EventBase thread: " << ex.what();
471     return false;
472   }
473
474   return true;
475 }
476
477 bool EventBase::runInEventBaseThread(const Cob& fn) {
478   // Short-circuit if we are already in our event base
479   if (inRunningEventBaseThread()) {
480     runInLoop(fn);
481     return true;
482   }
483
484   Cob* fnCopy;
485   // Allocate a copy of the function so we can pass it to the other thread
486   // The other thread will delete this copy once the function has been run
487   try {
488     fnCopy = new Cob(fn);
489   } catch (const std::bad_alloc& ex) {
490     LOG(ERROR) << "failed to allocate tr::function copy "
491                << "for runInEventBaseThread()";
492     return false;
493   }
494
495   if (!runInEventBaseThread(&EventBase::runFunctionPtr, fnCopy)) {
496     delete fnCopy;
497     return false;
498   }
499
500   return true;
501 }
502
503 bool EventBase::runAfterDelay(const Cob& cob,
504                                int milliseconds,
505                                TimeoutManager::InternalEnum in) {
506   CobTimeout* timeout = new CobTimeout(this, cob, in);
507   if (!timeout->scheduleTimeout(milliseconds)) {
508     delete timeout;
509     return false;
510   }
511
512   pendingCobTimeouts_.push_back(*timeout);
513   return true;
514 }
515
516 bool EventBase::runLoopCallbacks(bool setContext) {
517   if (!loopCallbacks_.empty()) {
518     bumpHandlingTime();
519     // Swap the loopCallbacks_ list with a temporary list on our stack.
520     // This way we will only run callbacks scheduled at the time
521     // runLoopCallbacks() was invoked.
522     //
523     // If any of these callbacks in turn call runInLoop() to schedule more
524     // callbacks, those new callbacks won't be run until the next iteration
525     // around the event loop.  This prevents runInLoop() callbacks from being
526     // able to start file descriptor and timeout based events.
527     LoopCallbackList currentCallbacks;
528     currentCallbacks.swap(loopCallbacks_);
529     runOnceCallbacks_ = &currentCallbacks;
530
531     while (!currentCallbacks.empty()) {
532       LoopCallback* callback = &currentCallbacks.front();
533       currentCallbacks.pop_front();
534       if (setContext) {
535         RequestContext::setContext(callback->context_);
536       }
537       callback->runLoopCallback();
538     }
539
540     runOnceCallbacks_ = nullptr;
541     return true;
542   }
543   return false;
544 }
545
546 void EventBase::initNotificationQueue() {
547   // Infinite size queue
548   queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
549
550   // We allocate fnRunner_ separately, rather than declaring it directly
551   // as a member of EventBase solely so that we don't need to include
552   // NotificationQueue.h from EventBase.h
553   fnRunner_.reset(new FunctionRunner());
554
555   // Mark this as an internal event, so event_base_loop() will return if
556   // there are no other events besides this one installed.
557   //
558   // Most callers don't care about the internal notification queue used by
559   // EventBase.  The queue is always installed, so if we did count the queue as
560   // an active event, loop() would never exit with no more events to process.
561   // Users can use loopForever() if they do care about the notification queue.
562   // (This is useful for EventBase threads that do nothing but process
563   // runInEventBaseThread() notifications.)
564   fnRunner_->startConsumingInternal(this, queue_.get());
565 }
566
567 void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) {
568   expCoeff_ = -1.0/timeInterval;
569   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
570 }
571
572 void EventBase::SmoothLoopTime::reset(double value) {
573   value_ = value;
574 }
575
576 void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) {
577     /*
578      * Position at which the busy sample is considered to be taken.
579      * (Allows to quickly skew our average without editing much code)
580      */
581     enum BusySamplePosition {
582       RIGHT = 0,  // busy sample placed at the end of the iteration
583       CENTER = 1, // busy sample placed at the middle point of the iteration
584       LEFT = 2,   // busy sample placed at the beginning of the iteration
585     };
586
587   VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ <<
588               " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ <<
589               " busy " << busy << " " << __PRETTY_FUNCTION__;
590   idle += oldBusyLeftover_ + busy;
591   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
592   idle -= oldBusyLeftover_;
593
594   double coeff = exp(idle * expCoeff_);
595   value_ *= coeff;
596   value_ += (1.0 - coeff) * busy;
597 }
598
599 bool EventBase::nothingHandledYet() {
600   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
601   return (nextLoopCnt_ != latestLoopCnt_);
602 }
603
604 /* static */
605 void EventBase::runFunctionPtr(Cob* fn) {
606   // The function should never throw an exception, because we have no
607   // way of knowing what sort of error handling to perform.
608   //
609   // If it does throw, log a message and abort the program.
610   try {
611     (*fn)();
612   } catch (const std::exception &ex) {
613     LOG(ERROR) << "runInEventBaseThread() std::function threw a "
614                << typeid(ex).name() << " exception: " << ex.what();
615     abort();
616   } catch (...) {
617     LOG(ERROR) << "runInEventBaseThread() std::function threw an exception";
618     abort();
619   }
620
621   // The function object was allocated by runInEventBaseThread().
622   // Delete it once it has been run.
623   delete fn;
624 }
625
626 EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
627     : fn_(fn)
628     , arg_(arg) {}
629
630 void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
631   fn_(arg_);
632   delete this;
633 }
634
635 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
636                                       InternalEnum internal) {
637
638   struct event* ev = obj->getEvent();
639   assert(ev->ev_base == nullptr);
640
641   event_base_set(getLibeventBase(), ev);
642   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
643     // Set the EVLIST_INTERNAL flag
644     ev->ev_flags |= EVLIST_INTERNAL;
645   }
646 }
647
648 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
649   cancelTimeout(obj);
650   struct event* ev = obj->getEvent();
651   ev->ev_base = nullptr;
652 }
653
654 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
655                                  std::chrono::milliseconds timeout) {
656   assert(isInEventBaseThread());
657   // Set up the timeval and add the event
658   struct timeval tv;
659   tv.tv_sec = timeout.count() / 1000LL;
660   tv.tv_usec = (timeout.count() % 1000LL) * 1000LL;
661
662   struct event* ev = obj->getEvent();
663   if (event_add(ev, &tv) < 0) {
664     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
665     return false;
666   }
667
668   return true;
669 }
670
671 void EventBase::cancelTimeout(AsyncTimeout* obj) {
672   assert(isInEventBaseThread());
673   struct event* ev = obj->getEvent();
674   if (EventUtil::isEventRegistered(ev)) {
675     event_del(ev);
676   }
677 }
678
679 void EventBase::setName(const std::string& name) {
680   assert(isInEventBaseThread());
681   name_ = name;
682
683   if (isRunning()) {
684     setThreadName(loopThread_.load(std::memory_order_relaxed),
685                   name_);
686   }
687 }
688
689 const std::string& EventBase::getName() {
690   assert(isInEventBaseThread());
691   return name_;
692 }
693
694 } // folly