932a63ac5e75abe7bf104a43e18d0b2477db0850
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef __STDC_FORMAT_MACROS
18 #define __STDC_FORMAT_MACROS
19 #endif
20
21 #include "folly/io/async/EventBase.h"
22
23 #include "folly/ThreadName.h"
24 #include "folly/io/async/NotificationQueue.h"
25
26 #include <boost/static_assert.hpp>
27 #include <fcntl.h>
28 #include <pthread.h>
29 #include <unistd.h>
30
31 namespace {
32
33 using folly::Cob;
34 using folly::EventBase;
35
36 template <typename Callback>
37 class FunctionLoopCallback : public EventBase::LoopCallback {
38  public:
39   explicit FunctionLoopCallback(Cob&& function)
40       : function_(std::move(function)) {}
41
42   explicit FunctionLoopCallback(const Cob& function)
43       : function_(function) {}
44
45   virtual void runLoopCallback() noexcept {
46     function_();
47     delete this;
48   }
49
50  private:
51   Callback function_;
52 };
53
54 }
55
56 namespace folly {
57
58 const int kNoFD = -1;
59
60 /*
61  * EventBase::FunctionRunner
62  */
63
64 class EventBase::FunctionRunner
65     : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
66  public:
67   void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
68
69     // In libevent2, internal events do not break the loop.
70     // Most users would expect loop(), followed by runInEventBaseThread(),
71     // to break the loop and check if it should exit or not.
72     // To have similar bejaviour to libevent1.4, tell the loop to break here.
73     // Note that loop() may still continue to loop, but it will also check the
74     // stop_ flag as well as runInLoop callbacks, etc.
75     event_base_loopbreak(getEventBase()->evb_);
76
77     if (msg.first == nullptr && msg.second == nullptr) {
78       // terminateLoopSoon() sends a null message just to
79       // wake up the loop.  We can ignore these messages.
80       return;
81     }
82
83     // If function is nullptr, just log and move on
84     if (!msg.first) {
85       LOG(ERROR) << "nullptr callback registered to be run in "
86                  << "event base thread";
87       return;
88     }
89
90     // The function should never throw an exception, because we have no
91     // way of knowing what sort of error handling to perform.
92     //
93     // If it does throw, log a message and abort the program.
94     try {
95       msg.first(msg.second);
96     } catch (const std::exception& ex) {
97       LOG(ERROR) << "runInEventBaseThread() function threw a "
98                  << typeid(ex).name() << " exception: " << ex.what();
99       abort();
100     } catch (...) {
101       LOG(ERROR) << "runInEventBaseThread() function threw an exception";
102       abort();
103     }
104   }
105 };
106
107 /*
108  * EventBase::CobTimeout methods
109  */
110
111 void EventBase::CobTimeout::timeoutExpired() noexcept {
112   // For now, we just swallow any exceptions that the callback threw.
113   try {
114     cob_();
115   } catch (const std::exception& ex) {
116     LOG(ERROR) << "EventBase::runAfterDelay() callback threw "
117                << typeid(ex).name() << " exception: " << ex.what();
118   } catch (...) {
119     LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception "
120                << "type";
121   }
122
123   // The CobTimeout object was allocated on the heap by runAfterDelay(),
124   // so delete it now that the it has fired.
125   delete this;
126 }
127
128 /*
129  * EventBase methods
130  */
131
132 EventBase::EventBase()
133   : runOnceCallbacks_(nullptr)
134   , stop_(false)
135   , loopThread_(0)
136   , evb_(static_cast<event_base*>(event_init()))
137   , queue_(nullptr)
138   , fnRunner_(nullptr)
139   , maxLatency_(0)
140   , avgLoopTime_(2000000)
141   , maxLatencyLoopTime_(avgLoopTime_)
142   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
143   , latestLoopCnt_(nextLoopCnt_)
144   , startWork_(0)
145   , observer_(nullptr)
146   , observerSampleCount_(0) {
147   VLOG(5) << "EventBase(): Created.";
148   initNotificationQueue();
149   RequestContext::getStaticContext();
150 }
151
152 // takes ownership of the event_base
153 EventBase::EventBase(event_base* evb)
154   : runOnceCallbacks_(nullptr)
155   , stop_(false)
156   , loopThread_(0)
157   , evb_(evb)
158   , queue_(nullptr)
159   , fnRunner_(nullptr)
160   , maxLatency_(0)
161   , avgLoopTime_(2000000)
162   , maxLatencyLoopTime_(avgLoopTime_)
163   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
164   , latestLoopCnt_(nextLoopCnt_)
165   , startWork_(0)
166   , observer_(nullptr)
167   , observerSampleCount_(0) {
168   initNotificationQueue();
169   RequestContext::getStaticContext();
170 }
171
172 EventBase::~EventBase() {
173   // Delete any unfired CobTimeout objects, so that we don't leak memory
174   // (Note that we don't fire them.  The caller is responsible for cleaning up
175   // its own data structures if it destroys the EventBase with unfired events
176   // remaining.)
177   while (!pendingCobTimeouts_.empty()) {
178     CobTimeout* timeout = &pendingCobTimeouts_.front();
179     delete timeout;
180   }
181
182   (void) runLoopCallbacks(false);
183
184   // Stop consumer before deleting NotificationQueue
185   fnRunner_->stopConsuming();
186   event_base_free(evb_);
187   VLOG(5) << "EventBase(): Destroyed.";
188 }
189
190 int EventBase::getNotificationQueueSize() const {
191   return queue_->size();
192 }
193
194 void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
195   fnRunner_->setMaxReadAtOnce(maxAtOnce);
196 }
197
198 // Set smoothing coefficient for loop load average; input is # of milliseconds
199 // for exp(-1) decay.
200 void EventBase::setLoadAvgMsec(uint32_t ms) {
201   uint64_t us = 1000 * ms;
202   if (ms > 0) {
203     maxLatencyLoopTime_.setTimeInterval(us);
204     avgLoopTime_.setTimeInterval(us);
205   } else {
206     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
207   }
208 }
209
210 void EventBase::resetLoadAvg(double value) {
211   avgLoopTime_.reset(value);
212   maxLatencyLoopTime_.reset(value);
213 }
214
215 static std::chrono::milliseconds
216 getTimeDelta(std::chrono::steady_clock::time_point* prev) {
217   auto result = std::chrono::steady_clock::now() - *prev;
218   *prev = std::chrono::steady_clock::now();
219
220   return std::chrono::duration_cast<std::chrono::milliseconds>(result);
221 }
222
223 void EventBase::waitUntilRunning() {
224   while (!isRunning()) {
225     sched_yield();
226   }
227 }
228
229 // enters the event_base loop -- will only exit when forced to
230 bool EventBase::loop() {
231   return loopBody();
232 }
233
234 bool EventBase::loopOnce() {
235   return loopBody(true);
236 }
237
238 bool EventBase::loopBody(bool once) {
239   VLOG(5) << "EventBase(): Starting loop.";
240   int res = 0;
241   bool ranLoopCallbacks;
242   int nonBlocking;
243
244   loopThread_.store(pthread_self(), std::memory_order_release);
245
246   if (!name_.empty()) {
247     setThreadName(name_);
248   }
249
250   auto prev = std::chrono::steady_clock::now();
251   int64_t idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
252     std::chrono::steady_clock::now().time_since_epoch()).count();
253
254   // TODO: Read stop_ atomically with an acquire barrier.
255   while (!stop_) {
256     ++nextLoopCnt_;
257
258     // nobody can add loop callbacks from within this thread if
259     // we don't have to handle anything to start with...
260     nonBlocking = (loopCallbacks_.empty() ? 0 : EVLOOP_NONBLOCK);
261     res = event_base_loop(evb_, EVLOOP_ONCE | nonBlocking);
262     ranLoopCallbacks = runLoopCallbacks();
263
264     int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
265       std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_;
266     int64_t idle = startWork_ - idleStart;
267
268     avgLoopTime_.addSample(idle, busy);
269     maxLatencyLoopTime_.addSample(idle, busy);
270
271     if (observer_) {
272       if (observerSampleCount_++ == observer_->getSampleRate()) {
273         observerSampleCount_ = 0;
274         observer_->loopSample(busy, idle);
275       }
276     }
277
278     VLOG(11) << "EventBase " << this         << " did not timeout "
279      " loop time guess: "    << busy + idle  <<
280      " idle time: "          << idle         <<
281      " busy time: "          << busy         <<
282      " avgLoopTime: "        << avgLoopTime_.get() <<
283      " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
284      " maxLatency_: "        << maxLatency_ <<
285      " nothingHandledYet(): "<< nothingHandledYet();
286
287     // see if our average loop time has exceeded our limit
288     if ((maxLatency_ > 0) &&
289         (maxLatencyLoopTime_.get() > double(maxLatency_))) {
290       maxLatencyCob_();
291       // back off temporarily -- don't keep spamming maxLatencyCob_
292       // if we're only a bit over the limit
293       maxLatencyLoopTime_.dampen(0.9);
294     }
295
296     // Our loop run did real work; reset the idle timer
297     idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
298       std::chrono::steady_clock::now().time_since_epoch()).count();
299
300     // If the event loop indicate that there were no more events, and
301     // we also didn't have any loop callbacks to run, there is nothing left to
302     // do.
303     if (res != 0 && !ranLoopCallbacks) {
304       // Since Notification Queue is marked 'internal' some events may not have
305       // run.  Run them manually if so, and continue looping.
306       //
307       if (getNotificationQueueSize() > 0) {
308         fnRunner_->handlerReady(0);
309       } else {
310         break;
311       }
312     }
313
314     VLOG(5) << "EventBase " << this << " loop time: " <<
315       getTimeDelta(&prev).count();
316
317     if (once) {
318       break;
319     }
320   }
321   // Reset stop_ so loop() can be called again
322   stop_ = false;
323
324   if (res < 0) {
325     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
326     return false;
327   } else if (res == 1) {
328     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
329   } else if (res > 1) {
330     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
331     return false;
332   }
333
334   loopThread_.store(0, std::memory_order_release);
335
336   VLOG(5) << "EventBase(): Done with loop.";
337   return true;
338 }
339
340 void EventBase::loopForever() {
341   // Update the notification queue event to treat it as a normal (non-internal)
342   // event.  The notification queue event always remains installed, and the main
343   // loop won't exit with it installed.
344   fnRunner_->stopConsuming();
345   fnRunner_->startConsuming(this, queue_.get());
346
347   bool ret = loop();
348
349   // Restore the notification queue internal flag
350   fnRunner_->stopConsuming();
351   fnRunner_->startConsumingInternal(this, queue_.get());
352
353   if (!ret) {
354     folly::throwSystemError("error in EventBase::loopForever()");
355   }
356 }
357
358 bool EventBase::bumpHandlingTime() {
359   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
360     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
361   if(nothingHandledYet()) {
362     latestLoopCnt_ = nextLoopCnt_;
363     // set the time
364     startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
365       std::chrono::steady_clock::now().time_since_epoch()).count();
366
367     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
368       " (loop) startWork_ " << startWork_;
369     return true;
370   }
371   return false;
372 }
373
374 void EventBase::terminateLoopSoon() {
375   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
376
377   if (!isRunning()) {
378     return;
379   }
380
381   // Set stop to true, so the event loop will know to exit.
382   // TODO: We should really use an atomic operation here with a release
383   // barrier.
384   stop_ = true;
385
386   // Call event_base_loopbreak() so that libevent will exit the next time
387   // around the loop.
388   event_base_loopbreak(evb_);
389
390   // If terminateLoopSoon() is called from another thread,
391   // the EventBase thread might be stuck waiting for events.
392   // In this case, it won't wake up and notice that stop_ is set until it
393   // receives another event.  Send an empty frame to the notification queue
394   // so that the event loop will wake up even if there are no other events.
395   //
396   // We don't care about the return value of trySendFrame().  If it fails
397   // this likely means the EventBase already has lots of events waiting
398   // anyway.
399   try {
400     queue_->putMessage(std::make_pair(nullptr, nullptr));
401   } catch (...) {
402     // We don't care if putMessage() fails.  This likely means
403     // the EventBase already has lots of events waiting anyway.
404   }
405 }
406
407 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
408   DCHECK(isInEventBaseThread());
409   callback->cancelLoopCallback();
410   callback->context_ = RequestContext::saveContext();
411   if (runOnceCallbacks_ != nullptr && thisIteration) {
412     runOnceCallbacks_->push_back(*callback);
413   } else {
414     loopCallbacks_.push_back(*callback);
415   }
416 }
417
418 void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
419   DCHECK(isInEventBaseThread());
420   auto wrapper = new FunctionLoopCallback<Cob>(cob);
421   wrapper->context_ = RequestContext::saveContext();
422   if (runOnceCallbacks_ != nullptr && thisIteration) {
423     runOnceCallbacks_->push_back(*wrapper);
424   } else {
425     loopCallbacks_.push_back(*wrapper);
426   }
427 }
428
429 void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
430   DCHECK(isInEventBaseThread());
431   auto wrapper = new FunctionLoopCallback<Cob>(std::move(cob));
432   wrapper->context_ = RequestContext::saveContext();
433   if (runOnceCallbacks_ != nullptr && thisIteration) {
434     runOnceCallbacks_->push_back(*wrapper);
435   } else {
436     loopCallbacks_.push_back(*wrapper);
437   }
438 }
439
440 bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
441   // Send the message.
442   // It will be received by the FunctionRunner in the EventBase's thread.
443
444   // We try not to schedule nullptr callbacks
445   if (!fn) {
446     LOG(ERROR) << "EventBase " << this
447                << ": Scheduling nullptr callbacks is not allowed";
448     return false;
449   }
450
451   // Short-circuit if we are already in our event base
452   if (inRunningEventBaseThread()) {
453     runInLoop(new RunInLoopCallback(fn, arg));
454     return true;
455
456   }
457
458   try {
459     queue_->putMessage(std::make_pair(fn, arg));
460   } catch (const std::exception& ex) {
461     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
462                << fn << "for EventBase thread: " << ex.what();
463     return false;
464   }
465
466   return true;
467 }
468
469 bool EventBase::runInEventBaseThread(const Cob& fn) {
470   // Short-circuit if we are already in our event base
471   if (inRunningEventBaseThread()) {
472     runInLoop(fn);
473     return true;
474   }
475
476   Cob* fnCopy;
477   // Allocate a copy of the function so we can pass it to the other thread
478   // The other thread will delete this copy once the function has been run
479   try {
480     fnCopy = new Cob(fn);
481   } catch (const std::bad_alloc& ex) {
482     LOG(ERROR) << "failed to allocate tr::function copy "
483                << "for runInEventBaseThread()";
484     return false;
485   }
486
487   if (!runInEventBaseThread(&EventBase::runFunctionPtr, fnCopy)) {
488     delete fnCopy;
489     return false;
490   }
491
492   return true;
493 }
494
495 bool EventBase::runAfterDelay(const Cob& cob,
496                                int milliseconds,
497                                TimeoutManager::InternalEnum in) {
498   CobTimeout* timeout = new CobTimeout(this, cob, in);
499   if (!timeout->scheduleTimeout(milliseconds)) {
500     delete timeout;
501     return false;
502   }
503
504   pendingCobTimeouts_.push_back(*timeout);
505   return true;
506 }
507
508 bool EventBase::runLoopCallbacks(bool setContext) {
509   if (!loopCallbacks_.empty()) {
510     bumpHandlingTime();
511     // Swap the loopCallbacks_ list with a temporary list on our stack.
512     // This way we will only run callbacks scheduled at the time
513     // runLoopCallbacks() was invoked.
514     //
515     // If any of these callbacks in turn call runInLoop() to schedule more
516     // callbacks, those new callbacks won't be run until the next iteration
517     // around the event loop.  This prevents runInLoop() callbacks from being
518     // able to start file descriptor and timeout based events.
519     LoopCallbackList currentCallbacks;
520     currentCallbacks.swap(loopCallbacks_);
521     runOnceCallbacks_ = &currentCallbacks;
522
523     while (!currentCallbacks.empty()) {
524       LoopCallback* callback = &currentCallbacks.front();
525       currentCallbacks.pop_front();
526       if (setContext) {
527         RequestContext::setContext(callback->context_);
528       }
529       callback->runLoopCallback();
530     }
531
532     runOnceCallbacks_ = nullptr;
533     return true;
534   }
535   return false;
536 }
537
538 void EventBase::initNotificationQueue() {
539   // Infinite size queue
540   queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
541
542   // We allocate fnRunner_ separately, rather than declaring it directly
543   // as a member of EventBase solely so that we don't need to include
544   // NotificationQueue.h from EventBase.h
545   fnRunner_.reset(new FunctionRunner());
546
547   // Mark this as an internal event, so event_base_loop() will return if
548   // there are no other events besides this one installed.
549   //
550   // Most callers don't care about the internal notification queue used by
551   // EventBase.  The queue is always installed, so if we did count the queue as
552   // an active event, loop() would never exit with no more events to process.
553   // Users can use loopForever() if they do care about the notification queue.
554   // (This is useful for EventBase threads that do nothing but process
555   // runInEventBaseThread() notifications.)
556   fnRunner_->startConsumingInternal(this, queue_.get());
557 }
558
559 void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) {
560   expCoeff_ = -1.0/timeInterval;
561   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
562 }
563
564 void EventBase::SmoothLoopTime::reset(double value) {
565   value_ = value;
566 }
567
568 void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) {
569     /*
570      * Position at which the busy sample is considered to be taken.
571      * (Allows to quickly skew our average without editing much code)
572      */
573     enum BusySamplePosition {
574       RIGHT = 0,  // busy sample placed at the end of the iteration
575       CENTER = 1, // busy sample placed at the middle point of the iteration
576       LEFT = 2,   // busy sample placed at the beginning of the iteration
577     };
578
579   VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ <<
580               " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ <<
581               " busy " << busy << " " << __PRETTY_FUNCTION__;
582   idle += oldBusyLeftover_ + busy;
583   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
584   idle -= oldBusyLeftover_;
585
586   double coeff = exp(idle * expCoeff_);
587   value_ *= coeff;
588   value_ += (1.0 - coeff) * busy;
589 }
590
591 bool EventBase::nothingHandledYet() {
592   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
593   return (nextLoopCnt_ != latestLoopCnt_);
594 }
595
596 /* static */
597 void EventBase::runFunctionPtr(Cob* fn) {
598   // The function should never throw an exception, because we have no
599   // way of knowing what sort of error handling to perform.
600   //
601   // If it does throw, log a message and abort the program.
602   try {
603     (*fn)();
604   } catch (const std::exception &ex) {
605     LOG(ERROR) << "runInEventBaseThread() std::function threw a "
606                << typeid(ex).name() << " exception: " << ex.what();
607     abort();
608   } catch (...) {
609     LOG(ERROR) << "runInEventBaseThread() std::function threw an exception";
610     abort();
611   }
612
613   // The function object was allocated by runInEventBaseThread().
614   // Delete it once it has been run.
615   delete fn;
616 }
617
618 EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
619     : fn_(fn)
620     , arg_(arg) {}
621
622 void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
623   fn_(arg_);
624   delete this;
625 }
626
627 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
628                                       InternalEnum internal) {
629
630   struct event* ev = obj->getEvent();
631   assert(ev->ev_base == nullptr);
632
633   event_base_set(getLibeventBase(), ev);
634   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
635     // Set the EVLIST_INTERNAL flag
636     ev->ev_flags |= EVLIST_INTERNAL;
637   }
638 }
639
640 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
641   cancelTimeout(obj);
642   struct event* ev = obj->getEvent();
643   ev->ev_base = nullptr;
644 }
645
646 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
647                                  std::chrono::milliseconds timeout) {
648   assert(isInEventBaseThread());
649   // Set up the timeval and add the event
650   struct timeval tv;
651   tv.tv_sec = timeout.count() / 1000LL;
652   tv.tv_usec = (timeout.count() % 1000LL) * 1000LL;
653
654   struct event* ev = obj->getEvent();
655   if (event_add(ev, &tv) < 0) {
656     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
657     return false;
658   }
659
660   return true;
661 }
662
663 void EventBase::cancelTimeout(AsyncTimeout* obj) {
664   assert(isInEventBaseThread());
665   struct event* ev = obj->getEvent();
666   if (EventUtil::isEventRegistered(ev)) {
667     event_del(ev);
668   }
669 }
670
671 void EventBase::setName(const std::string& name) {
672   assert(isInEventBaseThread());
673   name_ = name;
674
675   if (isRunning()) {
676     setThreadName(loopThread_.load(std::memory_order_relaxed),
677                   name_);
678   }
679 }
680
681 const std::string& EventBase::getName() {
682   assert(isInEventBaseThread());
683   return name_;
684 }
685
686 } // folly