Have EventBase implement wangle::Executor
[folly.git] / folly / io / async / EventBase.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <glog/logging.h>
20 #include <folly/io/async/AsyncTimeout.h>
21 #include <folly/io/async/TimeoutManager.h>
22 #include <folly/wangle/Executor.h>
23 #include <memory>
24 #include <stack>
25 #include <list>
26 #include <queue>
27 #include <cstdlib>
28 #include <set>
29 #include <utility>
30 #include <boost/intrusive/list.hpp>
31 #include <boost/utility.hpp>
32 #include <functional>
33 #include <event.h>  // libevent
34 #include <errno.h>
35 #include <math.h>
36 #include <atomic>
37
38 namespace folly {
39
40 typedef std::function<void()> Cob;
41 template <typename MessageT>
42 class NotificationQueue;
43
44 class EventBaseObserver {
45  public:
46   virtual ~EventBaseObserver() {}
47
48   virtual uint32_t getSampleRate() const = 0;
49
50   virtual void loopSample(
51     int64_t busyTime, int64_t idleTime) = 0;
52 };
53
54 /**
55  * This class is a wrapper for all asynchronous I/O processing functionality
56  *
57  * EventBase provides a main loop that notifies EventHandler callback objects
58  * when I/O is ready on a file descriptor, and notifies AsyncTimeout objects
59  * when a specified timeout has expired.  More complex, higher-level callback
60  * mechanisms can then be built on top of EventHandler and AsyncTimeout.
61  *
62  * A EventBase object can only drive an event loop for a single thread.  To
63  * take advantage of multiple CPU cores, most asynchronous I/O servers have one
64  * thread per CPU, and use a separate EventBase for each thread.
65  *
66  * In general, most EventBase methods may only be called from the thread
67  * running the EventBase's loop.  There are a few exceptions to this rule, for
68  * methods that are explicitly intended to allow communication with a
69  * EventBase from other threads.  When it is safe to call a method from
70  * another thread it is explicitly listed in the method comments.
71  */
72 class EventBase :
73   private boost::noncopyable, public TimeoutManager, public wangle::Executor
74 {
75  public:
76   /**
77    * A callback interface to use with runInLoop()
78    *
79    * Derive from this class if you need to delay some code execution until the
80    * next iteration of the event loop.  This allows you to schedule code to be
81    * invoked from the top-level of the loop, after your immediate callers have
82    * returned.
83    *
84    * If a LoopCallback object is destroyed while it is scheduled to be run in
85    * the next loop iteration, it will automatically be cancelled.
86    */
87   class LoopCallback {
88    public:
89     virtual ~LoopCallback() {}
90
91     virtual void runLoopCallback() noexcept = 0;
92     void cancelLoopCallback() {
93       hook_.unlink();
94     }
95
96     bool isLoopCallbackScheduled() const {
97       return hook_.is_linked();
98     }
99
100    private:
101     typedef boost::intrusive::list_member_hook<
102       boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
103
104     ListHook hook_;
105
106     typedef boost::intrusive::list<
107       LoopCallback,
108       boost::intrusive::member_hook<LoopCallback, ListHook,
109                                     &LoopCallback::hook_>,
110       boost::intrusive::constant_time_size<false> > List;
111
112     // EventBase needs access to LoopCallbackList (and therefore to hook_)
113     friend class EventBase;
114     std::shared_ptr<RequestContext> context_;
115   };
116
117   /**
118    * Create a new EventBase object.
119    */
120   EventBase();
121
122   /**
123    * Create a new EventBase object that will use the specified libevent
124    * event_base object to drive the event loop.
125    *
126    * The EventBase will take ownership of this event_base, and will call
127    * event_base_free(evb) when the EventBase is destroyed.
128    */
129   explicit EventBase(event_base* evb);
130   ~EventBase();
131
132   /**
133    * Runs the event loop.
134    *
135    * loop() will loop waiting for I/O or timeouts and invoking EventHandler
136    * and AsyncTimeout callbacks as their events become ready.  loop() will
137    * only return when there are no more events remaining to process, or after
138    * terminateLoopSoon() has been called.
139    *
140    * loop() may be called again to restart event processing after a previous
141    * call to loop() or loopForever() has returned.
142    *
143    * Returns true if the loop completed normally (if it processed all
144    * outstanding requests, or if terminateLoopSoon() was called).  If an error
145    * occurs waiting for events, false will be returned.
146    */
147   bool loop();
148
149   /**
150    * Wait for some events to become active, run them, then return.
151    *
152    * When EVLOOP_NONBLOCK is set in flags, the loop won't block if there
153    * are not any events to process.
154    *
155    * This is useful for callers that want to run the loop manually.
156    *
157    * Returns the same result as loop().
158    */
159   bool loopOnce(int flags = 0);
160
161   /**
162    * Runs the event loop.
163    *
164    * loopForever() behaves like loop(), except that it keeps running even if
165    * when there are no more user-supplied EventHandlers or AsyncTimeouts
166    * registered.  It will only return after terminateLoopSoon() has been
167    * called.
168    *
169    * This is useful for callers that want to wait for other threads to call
170    * runInEventBaseThread(), even when there are no other scheduled events.
171    *
172    * loopForever() may be called again to restart event processing after a
173    * previous call to loop() or loopForever() has returned.
174    *
175    * Throws a std::system_error if an error occurs.
176    */
177   void loopForever();
178
179   /**
180    * Causes the event loop to exit soon.
181    *
182    * This will cause an existing call to loop() or loopForever() to stop event
183    * processing and return, even if there are still events remaining to be
184    * processed.
185    *
186    * It is safe to call terminateLoopSoon() from another thread to cause loop()
187    * to wake up and return in the EventBase loop thread.  terminateLoopSoon()
188    * may also be called from the loop thread itself (for example, a
189    * EventHandler or AsyncTimeout callback may call terminateLoopSoon() to
190    * cause the loop to exit after the callback returns.)  If the loop is not
191    * running, this will cause the next call to loop to terminate soon after
192    * starting.  If a loop runs out of work (and so terminates on its own)
193    * concurrently with a call to terminateLoopSoon(), this may cause a race
194    * condition.
195    *
196    * Note that the caller is responsible for ensuring that cleanup of all event
197    * callbacks occurs properly.  Since terminateLoopSoon() causes the loop to
198    * exit even when there are pending events present, there may be remaining
199    * callbacks present waiting to be invoked.  If the loop is later restarted
200    * pending events will continue to be processed normally, however if the
201    * EventBase is destroyed after calling terminateLoopSoon() it is the
202    * caller's responsibility to ensure that cleanup happens properly even if
203    * some outstanding events are never processed.
204    */
205   void terminateLoopSoon();
206
207   /**
208    * Adds the given callback to a queue of things run after the current pass
209    * through the event loop completes.  Note that if this callback calls
210    * runInLoop() the new callback won't be called until the main event loop
211    * has gone through a cycle.
212    *
213    * This method may only be called from the EventBase's thread.  This
214    * essentially allows an event handler to schedule an additional callback to
215    * be invoked after it returns.
216    *
217    * Use runInEventBaseThread() to schedule functions from another thread.
218    *
219    * The thisIteration parameter makes this callback run in this loop
220    * iteration, instead of the next one, even if called from a
221    * runInLoop callback (normal io callbacks that call runInLoop will
222    * always run in this iteration).  This was originally added to
223    * support detachEventBase, as a user callback may have called
224    * terminateLoopSoon(), but we want to make sure we detach.  Also,
225    * detachEventBase almost always must be called from the base event
226    * loop to ensure the stack is unwound, since most users of
227    * EventBase are not thread safe.
228    *
229    * Ideally we would not need thisIteration, and instead just use
230    * runInLoop with loop() (instead of terminateLoopSoon).
231    */
232   void runInLoop(LoopCallback* callback, bool thisIteration = false);
233
234   /**
235    * Convenience function to call runInLoop() with a std::function.
236    *
237    * This creates a LoopCallback object to wrap the std::function, and invoke
238    * the std::function when the loop callback fires.  This is slightly more
239    * expensive than defining your own LoopCallback, but more convenient in
240    * areas that aren't performance sensitive where you just want to use
241    * std::bind.  (std::bind is fairly slow on even by itself.)
242    *
243    * This method may only be called from the EventBase's thread.  This
244    * essentially allows an event handler to schedule an additional callback to
245    * be invoked after it returns.
246    *
247    * Use runInEventBaseThread() to schedule functions from another thread.
248    */
249   void runInLoop(const Cob& c, bool thisIteration = false);
250
251   void runInLoop(Cob&& c, bool thisIteration = false);
252
253   /**
254    * Adds the given callback to a queue of things run before destruction
255    * of current EventBase.
256    *
257    * This allows users of EventBase that run in it, but don't control it,
258    * to be notified before EventBase gets destructed.
259    *
260    * Note: will be called from the thread that invoked EventBase destructor,
261    *       before the final run of loop callbacks.
262    */
263   void runOnDestruction(LoopCallback* callback);
264
265   void runBeforeLoop(LoopCallback* callback);
266
267   /**
268    * Run the specified function in the EventBase's thread.
269    *
270    * This method is thread-safe, and may be called from another thread.
271    *
272    * If runInEventBaseThread() is called when the EventBase loop is not
273    * running, the function call will be delayed until the next time the loop is
274    * started.
275    *
276    * If runInEventBaseThread() returns true the function has successfully been
277    * scheduled to run in the loop thread.  However, if the loop is terminated
278    * (and never later restarted) before it has a chance to run the requested
279    * function, the function may never be run at all.  The caller is responsible
280    * for handling this situation correctly if they may terminate the loop with
281    * outstanding runInEventBaseThread() calls pending.
282    *
283    * If two calls to runInEventBaseThread() are made from the same thread, the
284    * functions will always be run in the order that they were scheduled.
285    * Ordering between functions scheduled from separate threads is not
286    * guaranteed.
287    *
288    * @param fn  The function to run.  The function must not throw any
289    *     exceptions.
290    * @param arg An argument to pass to the function.
291    *
292    * @return Returns true if the function was successfully scheduled, or false
293    *         if there was an error scheduling the function.
294    */
295   template<typename T>
296   bool runInEventBaseThread(void (*fn)(T*), T* arg) {
297     return runInEventBaseThread(reinterpret_cast<void (*)(void*)>(fn),
298                                 reinterpret_cast<void*>(arg));
299   }
300
301   bool runInEventBaseThread(void (*fn)(void*), void* arg);
302
303   /**
304    * Run the specified function in the EventBase's thread
305    *
306    * This version of runInEventBaseThread() takes a std::function object.
307    * Note that this is less efficient than the version that takes a plain
308    * function pointer and void* argument, as it has to allocate memory to copy
309    * the std::function object.
310    *
311    * If the EventBase loop is terminated before it has a chance to run this
312    * function, the allocated memory will be leaked.  The caller is responsible
313    * for ensuring that the EventBase loop is not terminated before this
314    * function can run.
315    *
316    * The function must not throw any exceptions.
317    */
318   bool runInEventBaseThread(const Cob& fn);
319
320   /**
321    * Runs the given Cob at some time after the specified number of
322    * milliseconds.  (No guarantees exactly when.)
323    *
324    * @return  true iff the cob was successfully registered.
325    */
326   bool runAfterDelay(
327       const Cob& c,
328       int milliseconds,
329       TimeoutManager::InternalEnum = TimeoutManager::InternalEnum::NORMAL);
330
331   /**
332    * Set the maximum desired latency in us and provide a callback which will be
333    * called when that latency is exceeded.
334    */
335   void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) {
336     maxLatency_ = maxLatency;
337     maxLatencyCob_ = maxLatencyCob;
338   }
339
340   /**
341    * Set smoothing coefficient for loop load average; # of milliseconds
342    * for exp(-1) (1/2.71828...) decay.
343    */
344   void setLoadAvgMsec(uint32_t ms);
345
346   /**
347    * reset the load average to a desired value
348    */
349   void resetLoadAvg(double value = 0.0);
350
351   /**
352    * Get the average loop time in microseconds (an exponentially-smoothed ave)
353    */
354   double getAvgLoopTime() const {
355     return avgLoopTime_.get();
356   }
357
358   /**
359     * check if the event base loop is running.
360    */
361   bool isRunning() const {
362     return loopThread_.load(std::memory_order_relaxed) != 0;
363   }
364
365   /**
366    * wait until the event loop starts (after starting the event loop thread).
367    */
368   void waitUntilRunning();
369
370   int getNotificationQueueSize() const;
371
372   void setMaxReadAtOnce(uint32_t maxAtOnce);
373
374   /**
375    * Verify that current thread is the EventBase thread, if the EventBase is
376    * running.
377    */
378   bool isInEventBaseThread() const {
379     auto tid = loopThread_.load(std::memory_order_relaxed);
380     return tid == 0 || pthread_equal(tid, pthread_self());
381   }
382
383   bool inRunningEventBaseThread() const {
384     return pthread_equal(
385       loopThread_.load(std::memory_order_relaxed), pthread_self());
386   }
387
388   // --------- interface to underlying libevent base ------------
389   // Avoid using these functions if possible.  These functions are not
390   // guaranteed to always be present if we ever provide alternative EventBase
391   // implementations that do not use libevent internally.
392   event_base* getLibeventBase() const { return evb_; }
393   static const char* getLibeventVersion();
394   static const char* getLibeventMethod();
395
396   /**
397    * only EventHandler/AsyncTimeout subclasses and ourselves should
398    * ever call this.
399    *
400    * This is used to mark the beginning of a new loop cycle by the
401    * first handler fired within that cycle.
402    *
403    */
404   bool bumpHandlingTime();
405
406   class SmoothLoopTime {
407    public:
408     explicit SmoothLoopTime(uint64_t timeInterval)
409       : expCoeff_(-1.0/timeInterval)
410       , value_(0.0)
411       , oldBusyLeftover_(0) {
412       VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
413     }
414
415     void setTimeInterval(uint64_t timeInterval);
416     void reset(double value = 0.0);
417
418     void addSample(int64_t idle, int64_t busy);
419
420     double get() const {
421       return value_;
422     }
423
424     void dampen(double factor) {
425       value_ *= factor;
426     }
427
428    private:
429     double  expCoeff_;
430     double  value_;
431     int64_t oldBusyLeftover_;
432   };
433
434   void setObserver(
435     const std::shared_ptr<EventBaseObserver>& observer) {
436     observer_ = observer;
437   }
438
439   const std::shared_ptr<EventBaseObserver>& getObserver() {
440     return observer_;
441   }
442
443   /**
444    * Set the name of the thread that runs this event base.
445    */
446   void setName(const std::string& name);
447
448   /**
449    * Returns the name of the thread that runs this event base.
450    */
451   const std::string& getName();
452
453   /// Implements the wangle::Executor interface
454   void add(Cob fn) override {
455     // runInEventBaseThread() takes a const&,
456     // so no point in doing std::move here.
457     runInEventBaseThread(fn);
458   }
459
460  private:
461
462   // TimeoutManager
463   void attachTimeoutManager(AsyncTimeout* obj,
464                             TimeoutManager::InternalEnum internal);
465
466   void detachTimeoutManager(AsyncTimeout* obj);
467
468   bool scheduleTimeout(AsyncTimeout* obj, std::chrono::milliseconds timeout);
469
470   void cancelTimeout(AsyncTimeout* obj);
471
472   bool isInTimeoutManagerThread() {
473     return isInEventBaseThread();
474   }
475
476   // Helper class used to short circuit runInEventBaseThread
477   class RunInLoopCallback : public LoopCallback {
478    public:
479     RunInLoopCallback(void (*fn)(void*), void* arg);
480     void runLoopCallback() noexcept;
481
482    private:
483     void (*fn_)(void*);
484     void* arg_;
485   };
486
487   /*
488    * Helper function that tells us whether we have already handled
489    * some event/timeout/callback in this loop iteration.
490    */
491   bool nothingHandledYet();
492
493   // --------- libevent callbacks (not for client use) ------------
494
495   static void runFunctionPtr(std::function<void()>* fn);
496
497   // small object used as a callback arg with enough info to execute the
498   // appropriate client-provided Cob
499   class CobTimeout : public AsyncTimeout {
500    public:
501     CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in)
502         : AsyncTimeout(b, in), cob_(c) {}
503
504     virtual void timeoutExpired() noexcept;
505
506    private:
507     Cob cob_;
508
509    public:
510     typedef boost::intrusive::list_member_hook<
511       boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
512
513     ListHook hook;
514
515     typedef boost::intrusive::list<
516       CobTimeout,
517       boost::intrusive::member_hook<CobTimeout, ListHook, &CobTimeout::hook>,
518       boost::intrusive::constant_time_size<false> > List;
519   };
520
521   typedef LoopCallback::List LoopCallbackList;
522   class FunctionRunner;
523
524   bool loopBody(int flags = 0);
525
526   // executes any callbacks queued by runInLoop(); returns false if none found
527   bool runLoopCallbacks(bool setContext = true);
528
529   void initNotificationQueue();
530
531   CobTimeout::List pendingCobTimeouts_;
532
533   LoopCallbackList loopCallbacks_;
534   LoopCallbackList noWaitLoopCallbacks_;
535   LoopCallbackList onDestructionCallbacks_;
536
537   // This will be null most of the time, but point to currentCallbacks
538   // if we are in the middle of running loop callbacks, such that
539   // runInLoop(..., true) will always run in the current loop
540   // iteration.
541   LoopCallbackList* runOnceCallbacks_;
542
543   // stop_ is set by terminateLoopSoon() and is used by the main loop
544   // to determine if it should exit
545   bool stop_;
546
547   // The ID of the thread running the main loop.
548   // 0 if loop is not running.
549   // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or
550   // even that atomic<pthread_t> is valid), but that's how it is
551   // everywhere (at least on Linux, FreeBSD, and OSX).
552   std::atomic<pthread_t> loopThread_;
553
554   // pointer to underlying event_base class doing the heavy lifting
555   event_base* evb_;
556
557   // A notification queue for runInEventBaseThread() to use
558   // to send function requests to the EventBase thread.
559   std::unique_ptr<NotificationQueue<std::pair<void (*)(void*), void*>>> queue_;
560   std::unique_ptr<FunctionRunner> fnRunner_;
561
562   // limit for latency in microseconds (0 disables)
563   int64_t maxLatency_;
564
565   // exponentially-smoothed average loop time for latency-limiting
566   SmoothLoopTime avgLoopTime_;
567
568   // smoothed loop time used to invoke latency callbacks; differs from
569   // avgLoopTime_ in that it's scaled down after triggering a callback
570   // to reduce spamminess
571   SmoothLoopTime maxLatencyLoopTime_;
572
573   // callback called when latency limit is exceeded
574   Cob maxLatencyCob_;
575
576   // we'll wait this long before running deferred callbacks if the event
577   // loop is idle.
578   static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms
579
580   // Wrap-around loop counter to detect beginning of each loop
581   uint64_t nextLoopCnt_;
582   uint64_t latestLoopCnt_;
583   uint64_t startWork_;
584
585   // Observer to export counters
586   std::shared_ptr<EventBaseObserver> observer_;
587   uint32_t observerSampleCount_;
588
589   // Name of the thread running this EventBase
590   std::string name_;
591 };
592
593 } // folly