Move thrift/lib/cpp/async to folly.
[folly.git] / folly / io / async / EventBase.cpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19
20 #ifndef __STDC_FORMAT_MACROS
21 #define __STDC_FORMAT_MACROS
22 #endif
23
24 #include "folly/io/async/EventBase.h"
25
26 #include "folly/io/async/NotificationQueue.h"
27
28 #include <boost/static_assert.hpp>
29 #include <fcntl.h>
30 #include <pthread.h>
31 #include <unistd.h>
32
33 namespace {
34
35 using folly::Cob;
36 using folly::EventBase;
37
38 class Tr1FunctionLoopCallback : public EventBase::LoopCallback {
39  public:
40   explicit Tr1FunctionLoopCallback(const Cob& function)
41     : function_(function) {}
42
43   virtual void runLoopCallback() noexcept {
44     function_();
45     delete this;
46   }
47
48  private:
49   Cob function_;
50 };
51
52 }
53
54 namespace folly {
55
56 const int kNoFD = -1;
57
58 /*
59  * EventBase::FunctionRunner
60  */
61
62 class EventBase::FunctionRunner
63     : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
64  public:
65   void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
66
67     // In libevent2, internal events do not break the loop.
68     // Most users would expect loop(), followed by runInEventBaseThread(),
69     // to break the loop and check if it should exit or not.
70     // To have similar bejaviour to libevent1.4, tell the loop to break here.
71     // Note that loop() may still continue to loop, but it will also check the
72     // stop_ flag as well as runInLoop callbacks, etc.
73     event_base_loopbreak(getEventBase()->evb_);
74
75     if (msg.first == nullptr && msg.second == nullptr) {
76       // terminateLoopSoon() sends a null message just to
77       // wake up the loop.  We can ignore these messages.
78       return;
79     }
80
81     // If function is nullptr, just log and move on
82     if (!msg.first) {
83       LOG(ERROR) << "nullptr callback registered to be run in "
84                  << "event base thread";
85       return;
86     }
87
88     // The function should never throw an exception, because we have no
89     // way of knowing what sort of error handling to perform.
90     //
91     // If it does throw, log a message and abort the program.
92     try {
93       msg.first(msg.second);
94     } catch (const std::exception& ex) {
95       LOG(ERROR) << "runInEventBaseThread() function threw a "
96                  << typeid(ex).name() << " exception: " << ex.what();
97       abort();
98     } catch (...) {
99       LOG(ERROR) << "runInEventBaseThread() function threw an exception";
100       abort();
101     }
102   }
103 };
104
105 /*
106  * EventBase::CobTimeout methods
107  */
108
109 void EventBase::CobTimeout::timeoutExpired() noexcept {
110   // For now, we just swallow any exceptions that the callback threw.
111   try {
112     cob_();
113   } catch (const std::exception& ex) {
114     LOG(ERROR) << "EventBase::runAfterDelay() callback threw "
115                << typeid(ex).name() << " exception: " << ex.what();
116   } catch (...) {
117     LOG(ERROR) << "EventBase::runAfterDelay() callback threw non-exception "
118                << "type";
119   }
120
121   // The CobTimeout object was allocated on the heap by runAfterDelay(),
122   // so delete it now that the it has fired.
123   delete this;
124 }
125
126 /*
127  * EventBase methods
128  */
129
130 EventBase::EventBase()
131   : runOnceCallbacks_(nullptr)
132   , stop_(false)
133   , loopThread_(0)
134   , evb_(static_cast<event_base*>(event_init()))
135   , queue_(nullptr)
136   , fnRunner_(nullptr)
137   , maxLatency_(0)
138   , avgLoopTime_(2000000)
139   , maxLatencyLoopTime_(avgLoopTime_)
140   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
141   , latestLoopCnt_(nextLoopCnt_)
142   , startWork_(0)
143   , observer_(nullptr)
144   , observerSampleCount_(0) {
145   VLOG(5) << "EventBase(): Created.";
146   initNotificationQueue();
147   RequestContext::getStaticContext();
148 }
149
150 // takes ownership of the event_base
151 EventBase::EventBase(event_base* evb)
152   : runOnceCallbacks_(nullptr)
153   , stop_(false)
154   , loopThread_(0)
155   , evb_(evb)
156   , queue_(nullptr)
157   , fnRunner_(nullptr)
158   , maxLatency_(0)
159   , avgLoopTime_(2000000)
160   , maxLatencyLoopTime_(avgLoopTime_)
161   , nextLoopCnt_(-40)       // Early wrap-around so bugs will manifest soon
162   , latestLoopCnt_(nextLoopCnt_)
163   , startWork_(0)
164   , observer_(nullptr)
165   , observerSampleCount_(0) {
166   initNotificationQueue();
167   RequestContext::getStaticContext();
168 }
169
170 EventBase::~EventBase() {
171   // Delete any unfired CobTimeout objects, so that we don't leak memory
172   // (Note that we don't fire them.  The caller is responsible for cleaning up
173   // its own data structures if it destroys the EventBase with unfired events
174   // remaining.)
175   while (!pendingCobTimeouts_.empty()) {
176     CobTimeout* timeout = &pendingCobTimeouts_.front();
177     delete timeout;
178   }
179
180   (void) runLoopCallbacks(false);
181
182   // Stop consumer before deleting NotificationQueue
183   fnRunner_->stopConsuming();
184   event_base_free(evb_);
185   VLOG(5) << "EventBase(): Destroyed.";
186 }
187
188 int EventBase::getNotificationQueueSize() const {
189   return queue_->size();
190 }
191
192 // Set smoothing coefficient for loop load average; input is # of milliseconds
193 // for exp(-1) decay.
194 void EventBase::setLoadAvgMsec(uint32_t ms) {
195   uint64_t us = 1000 * ms;
196   if (ms > 0) {
197     maxLatencyLoopTime_.setTimeInterval(us);
198     avgLoopTime_.setTimeInterval(us);
199   } else {
200     LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
201   }
202 }
203
204 void EventBase::resetLoadAvg(double value) {
205   avgLoopTime_.reset(value);
206   maxLatencyLoopTime_.reset(value);
207 }
208
209 static int64_t getTimeDelta(int64_t *prev) {
210   int64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
211     std::chrono::steady_clock::now().time_since_epoch()).count();
212   int64_t delta = now - *prev;
213   *prev = now;
214   return delta;
215 }
216
217 void EventBase::waitUntilRunning() {
218   while (!isRunning()) {
219     sched_yield();
220   }
221 }
222
223 // enters the event_base loop -- will only exit when forced to
224 bool EventBase::loop() {
225   VLOG(5) << "EventBase(): Starting loop.";
226   int res = 0;
227   bool ranLoopCallbacks;
228   int nonBlocking;
229
230   loopThread_.store(pthread_self(), std::memory_order_release);
231
232 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
233   if (!name_.empty()) {
234     pthread_setname_np(pthread_self(), name_.c_str());
235   }
236 #endif
237
238   int64_t prev = std::chrono::duration_cast<std::chrono::milliseconds>(
239     std::chrono::steady_clock::now().time_since_epoch()).count();
240   int64_t idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
241     std::chrono::steady_clock::now().time_since_epoch()).count();
242
243   // TODO: Read stop_ atomically with an acquire barrier.
244   while (!stop_) {
245     ++nextLoopCnt_;
246
247     // nobody can add loop callbacks from within this thread if
248     // we don't have to handle anything to start with...
249     nonBlocking = (loopCallbacks_.empty() ? 0 : EVLOOP_NONBLOCK);
250     res = event_base_loop(evb_, EVLOOP_ONCE | nonBlocking);
251     ranLoopCallbacks = runLoopCallbacks();
252
253     int64_t busy = std::chrono::duration_cast<std::chrono::microseconds>(
254       std::chrono::steady_clock::now().time_since_epoch()).count() - startWork_;
255     int64_t idle = startWork_ - idleStart;
256
257     avgLoopTime_.addSample(idle, busy);
258     maxLatencyLoopTime_.addSample(idle, busy);
259
260     if (observer_) {
261       if (observerSampleCount_++ == observer_->getSampleRate()) {
262         observerSampleCount_ = 0;
263         observer_->loopSample(busy, idle);
264       }
265     }
266
267     VLOG(11) << "EventBase " << this         << " did not timeout "
268      " loop time guess: "    << busy + idle  <<
269      " idle time: "          << idle         <<
270      " busy time: "          << busy         <<
271      " avgLoopTime: "        << avgLoopTime_.get() <<
272      " maxLatencyLoopTime: " << maxLatencyLoopTime_.get() <<
273      " maxLatency_: "        << maxLatency_ <<
274      " nothingHandledYet(): "<< nothingHandledYet();
275
276     // see if our average loop time has exceeded our limit
277     if ((maxLatency_ > 0) &&
278         (maxLatencyLoopTime_.get() > double(maxLatency_))) {
279       maxLatencyCob_();
280       // back off temporarily -- don't keep spamming maxLatencyCob_
281       // if we're only a bit over the limit
282       maxLatencyLoopTime_.dampen(0.9);
283     }
284
285     // Our loop run did real work; reset the idle timer
286     idleStart = std::chrono::duration_cast<std::chrono::microseconds>(
287       std::chrono::steady_clock::now().time_since_epoch()).count();
288
289     // If the event loop indicate that there were no more events, and
290     // we also didn't have any loop callbacks to run, there is nothing left to
291     // do.
292     if (res != 0 && !ranLoopCallbacks) {
293       // Since Notification Queue is marked 'internal' some events may not have
294       // run.  Run them manually if so, and continue looping.
295       //
296       if (getNotificationQueueSize() > 0) {
297         fnRunner_->handlerReady(0);
298       } else {
299         break;
300       }
301     }
302
303     VLOG(5) << "EventBase " << this << " loop time: " << getTimeDelta(&prev);
304   }
305   // Reset stop_ so loop() can be called again
306   stop_ = false;
307
308   if (res < 0) {
309     LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
310     return false;
311   } else if (res == 1) {
312     VLOG(5) << "EventBase: ran out of events (exiting loop)!";
313   } else if (res > 1) {
314     LOG(ERROR) << "EventBase: unknown event loop result = " << res;
315     return false;
316   }
317
318   loopThread_.store(0, std::memory_order_release);
319
320   VLOG(5) << "EventBase(): Done with loop.";
321   return true;
322 }
323
324 void EventBase::loopForever() {
325   // Update the notification queue event to treat it as a normal (non-internal)
326   // event.  The notification queue event always remains installed, and the main
327   // loop won't exit with it installed.
328   fnRunner_->stopConsuming();
329   fnRunner_->startConsuming(this, queue_.get());
330
331   bool ret = loop();
332
333   // Restore the notification queue internal flag
334   fnRunner_->stopConsuming();
335   fnRunner_->startConsumingInternal(this, queue_.get());
336
337   if (!ret) {
338     folly::throwSystemError("error in EventBase::loopForever()");
339   }
340 }
341
342 bool EventBase::bumpHandlingTime() {
343   VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
344     " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
345   if(nothingHandledYet()) {
346     latestLoopCnt_ = nextLoopCnt_;
347     // set the time
348     startWork_ = std::chrono::duration_cast<std::chrono::microseconds>(
349       std::chrono::steady_clock::now().time_since_epoch()).count();
350
351     VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__ <<
352       " (loop) startWork_ " << startWork_;
353     return true;
354   }
355   return false;
356 }
357
358 void EventBase::terminateLoopSoon() {
359   VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
360
361   if (!isRunning()) {
362     return;
363   }
364
365   // Set stop to true, so the event loop will know to exit.
366   // TODO: We should really use an atomic operation here with a release
367   // barrier.
368   stop_ = true;
369
370   // Call event_base_loopbreak() so that libevent will exit the next time
371   // around the loop.
372   event_base_loopbreak(evb_);
373
374   // If terminateLoopSoon() is called from another thread,
375   // the EventBase thread might be stuck waiting for events.
376   // In this case, it won't wake up and notice that stop_ is set until it
377   // receives another event.  Send an empty frame to the notification queue
378   // so that the event loop will wake up even if there are no other events.
379   //
380   // We don't care about the return value of trySendFrame().  If it fails
381   // this likely means the EventBase already has lots of events waiting
382   // anyway.
383   try {
384     queue_->putMessage(std::make_pair(nullptr, nullptr));
385   } catch (...) {
386     // We don't care if putMessage() fails.  This likely means
387     // the EventBase already has lots of events waiting anyway.
388   }
389 }
390
391 void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
392   DCHECK(isInEventBaseThread());
393   callback->cancelLoopCallback();
394   callback->context_ = RequestContext::saveContext();
395   if (runOnceCallbacks_ != nullptr && thisIteration) {
396     runOnceCallbacks_->push_back(*callback);
397   } else {
398     loopCallbacks_.push_back(*callback);
399   }
400 }
401
402 void EventBase::runInLoop(const Cob& cob, bool thisIteration) {
403   DCHECK(isInEventBaseThread());
404   Tr1FunctionLoopCallback* wrapper = new Tr1FunctionLoopCallback(cob);
405   wrapper->context_ = RequestContext::saveContext();
406   if (runOnceCallbacks_ != nullptr && thisIteration) {
407     runOnceCallbacks_->push_back(*wrapper);
408   } else {
409     loopCallbacks_.push_back(*wrapper);
410   }
411 }
412
413 bool EventBase::runInEventBaseThread(void (*fn)(void*), void* arg) {
414   // Send the message.
415   // It will be received by the FunctionRunner in the EventBase's thread.
416
417   // We try not to schedule nullptr callbacks
418   if (!fn) {
419     LOG(ERROR) << "EventBase " << this
420                << ": Scheduling nullptr callbacks is not allowed";
421     return false;
422   }
423
424   // Short-circuit if we are already in our event base
425   if (inRunningEventBaseThread()) {
426     runInLoop(new RunInLoopCallback(fn, arg));
427     return true;
428
429   }
430
431   try {
432     queue_->putMessage(std::make_pair(fn, arg));
433   } catch (const std::exception& ex) {
434     LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
435                << fn << "for EventBase thread: " << ex.what();
436     return false;
437   }
438
439   return true;
440 }
441
442 bool EventBase::runInEventBaseThread(const Cob& fn) {
443   // Short-circuit if we are already in our event base
444   if (inRunningEventBaseThread()) {
445     runInLoop(fn);
446     return true;
447   }
448
449   Cob* fnCopy;
450   // Allocate a copy of the function so we can pass it to the other thread
451   // The other thread will delete this copy once the function has been run
452   try {
453     fnCopy = new Cob(fn);
454   } catch (const std::bad_alloc& ex) {
455     LOG(ERROR) << "failed to allocate tr::function copy "
456                << "for runInEventBaseThread()";
457     return false;
458   }
459
460   if (!runInEventBaseThread(&EventBase::runTr1FunctionPtr, fnCopy)) {
461     delete fnCopy;
462     return false;
463   }
464
465   return true;
466 }
467
468 bool EventBase::runAfterDelay(const Cob& cob,
469                                int milliseconds,
470                                TimeoutManager::InternalEnum in) {
471   CobTimeout* timeout = new CobTimeout(this, cob, in);
472   if (!timeout->scheduleTimeout(milliseconds)) {
473     delete timeout;
474     return false;
475   }
476
477   pendingCobTimeouts_.push_back(*timeout);
478   return true;
479 }
480
481 bool EventBase::runLoopCallbacks(bool setContext) {
482   if (!loopCallbacks_.empty()) {
483     bumpHandlingTime();
484     // Swap the loopCallbacks_ list with a temporary list on our stack.
485     // This way we will only run callbacks scheduled at the time
486     // runLoopCallbacks() was invoked.
487     //
488     // If any of these callbacks in turn call runInLoop() to schedule more
489     // callbacks, those new callbacks won't be run until the next iteration
490     // around the event loop.  This prevents runInLoop() callbacks from being
491     // able to start file descriptor and timeout based events.
492     LoopCallbackList currentCallbacks;
493     currentCallbacks.swap(loopCallbacks_);
494     runOnceCallbacks_ = &currentCallbacks;
495
496     while (!currentCallbacks.empty()) {
497       LoopCallback* callback = &currentCallbacks.front();
498       currentCallbacks.pop_front();
499       if (setContext) {
500         RequestContext::setContext(callback->context_);
501       }
502       callback->runLoopCallback();
503     }
504
505     runOnceCallbacks_ = nullptr;
506     return true;
507   }
508   return false;
509 }
510
511 void EventBase::initNotificationQueue() {
512   // Infinite size queue
513   queue_.reset(new NotificationQueue<std::pair<void (*)(void*), void*>>());
514
515   // We allocate fnRunner_ separately, rather than declaring it directly
516   // as a member of EventBase solely so that we don't need to include
517   // NotificationQueue.h from EventBase.h
518   fnRunner_.reset(new FunctionRunner());
519
520   // Mark this as an internal event, so event_base_loop() will return if
521   // there are no other events besides this one installed.
522   //
523   // Most callers don't care about the internal notification queue used by
524   // EventBase.  The queue is always installed, so if we did count the queue as
525   // an active event, loop() would never exit with no more events to process.
526   // Users can use loopForever() if they do care about the notification queue.
527   // (This is useful for EventBase threads that do nothing but process
528   // runInEventBaseThread() notifications.)
529   fnRunner_->startConsumingInternal(this, queue_.get());
530 }
531
532 void EventBase::SmoothLoopTime::setTimeInterval(uint64_t timeInterval) {
533   expCoeff_ = -1.0/timeInterval;
534   VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
535 }
536
537 void EventBase::SmoothLoopTime::reset(double value) {
538   value_ = value;
539 }
540
541 void EventBase::SmoothLoopTime::addSample(int64_t idle, int64_t busy) {
542     /*
543      * Position at which the busy sample is considered to be taken.
544      * (Allows to quickly skew our average without editing much code)
545      */
546     enum BusySamplePosition {
547       RIGHT = 0,  // busy sample placed at the end of the iteration
548       CENTER = 1, // busy sample placed at the middle point of the iteration
549       LEFT = 2,   // busy sample placed at the beginning of the iteration
550     };
551
552   VLOG(11) << "idle " << idle << " oldBusyLeftover_ " << oldBusyLeftover_ <<
553               " idle + oldBusyLeftover_ " << idle + oldBusyLeftover_ <<
554               " busy " << busy << " " << __PRETTY_FUNCTION__;
555   idle += oldBusyLeftover_ + busy;
556   oldBusyLeftover_ = (busy * BusySamplePosition::CENTER) / 2;
557   idle -= oldBusyLeftover_;
558
559   double coeff = exp(idle * expCoeff_);
560   value_ *= coeff;
561   value_ += (1.0 - coeff) * busy;
562 }
563
564 bool EventBase::nothingHandledYet() {
565   VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
566   return (nextLoopCnt_ != latestLoopCnt_);
567 }
568
569 /* static */
570 void EventBase::runTr1FunctionPtr(Cob* fn) {
571   // The function should never throw an exception, because we have no
572   // way of knowing what sort of error handling to perform.
573   //
574   // If it does throw, log a message and abort the program.
575   try {
576     (*fn)();
577   } catch (const std::exception &ex) {
578     LOG(ERROR) << "runInEventBaseThread() std::function threw a "
579                << typeid(ex).name() << " exception: " << ex.what();
580     abort();
581   } catch (...) {
582     LOG(ERROR) << "runInEventBaseThread() std::function threw an exception";
583     abort();
584   }
585
586   // The function object was allocated by runInEventBaseThread().
587   // Delete it once it has been run.
588   delete fn;
589 }
590
591 EventBase::RunInLoopCallback::RunInLoopCallback(void (*fn)(void*), void* arg)
592     : fn_(fn)
593     , arg_(arg) {}
594
595 void EventBase::RunInLoopCallback::runLoopCallback() noexcept {
596   fn_(arg_);
597   delete this;
598 }
599
600 void EventBase::attachTimeoutManager(AsyncTimeout* obj,
601                                       InternalEnum internal) {
602
603   struct event* ev = obj->getEvent();
604   assert(ev->ev_base == nullptr);
605
606   event_base_set(getLibeventBase(), ev);
607   if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
608     // Set the EVLIST_INTERNAL flag
609     ev->ev_flags |= EVLIST_INTERNAL;
610   }
611 }
612
613 void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
614   cancelTimeout(obj);
615   struct event* ev = obj->getEvent();
616   ev->ev_base = nullptr;
617 }
618
619 bool EventBase::scheduleTimeout(AsyncTimeout* obj,
620                                  std::chrono::milliseconds timeout) {
621   assert(isInEventBaseThread());
622   // Set up the timeval and add the event
623   struct timeval tv;
624   tv.tv_sec = timeout.count() / 1000LL;
625   tv.tv_usec = (timeout.count() % 1000LL) * 1000LL;
626
627   struct event* ev = obj->getEvent();
628   if (event_add(ev, &tv) < 0) {
629     LOG(ERROR) << "EventBase: failed to schedule timeout: " << strerror(errno);
630     return false;
631   }
632
633   return true;
634 }
635
636 void EventBase::cancelTimeout(AsyncTimeout* obj) {
637   assert(isInEventBaseThread());
638   struct event* ev = obj->getEvent();
639   if (EventUtil::isEventRegistered(ev)) {
640     event_del(ev);
641   }
642 }
643
644 void EventBase::setName(const std::string& name) {
645   assert(isInEventBaseThread());
646   name_ = name;
647 #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
648   if (isRunning()) {
649     pthread_setname_np(loopThread_.load(std::memory_order_relaxed),
650                        name_.c_str());
651   }
652 #endif
653 }
654
655 const std::string& EventBase::getName() {
656   assert(isInEventBaseThread());
657   return name_;
658 }
659
660 } // folly