Add loopOnce() method to folly::EventBase
[folly.git] / folly / io / async / EventBase.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <glog/logging.h>
20 #include "folly/io/async/AsyncTimeout.h"
21 #include "folly/io/async/TimeoutManager.h"
22 #include <memory>
23 #include <stack>
24 #include <list>
25 #include <queue>
26 #include <cstdlib>
27 #include <set>
28 #include <utility>
29 #include <boost/intrusive/list.hpp>
30 #include <boost/utility.hpp>
31 #include <functional>
32 #include <event.h>  // libevent
33 #include <errno.h>
34 #include <math.h>
35 #include <atomic>
36
37 namespace folly {
38
39 typedef std::function<void()> Cob;
40 template <typename MessageT>
41 class NotificationQueue;
42
43 class EventBaseObserver {
44  public:
45   virtual ~EventBaseObserver() {}
46
47   virtual uint32_t getSampleRate() const = 0;
48
49   virtual void loopSample(
50     int64_t busyTime, int64_t idleTime) = 0;
51 };
52
53 /**
54  * This class is a wrapper for all asynchronous I/O processing functionality
55  *
56  * EventBase provides a main loop that notifies EventHandler callback objects
57  * when I/O is ready on a file descriptor, and notifies AsyncTimeout objects
58  * when a specified timeout has expired.  More complex, higher-level callback
59  * mechanisms can then be built on top of EventHandler and AsyncTimeout.
60  *
61  * A EventBase object can only drive an event loop for a single thread.  To
62  * take advantage of multiple CPU cores, most asynchronous I/O servers have one
63  * thread per CPU, and use a separate EventBase for each thread.
64  *
65  * In general, most EventBase methods may only be called from the thread
66  * running the EventBase's loop.  There are a few exceptions to this rule, for
67  * methods that are explicitly intended to allow communication with a
68  * EventBase from other threads.  When it is safe to call a method from
69  * another thread it is explicitly listed in the method comments.
70  */
71 class EventBase : private boost::noncopyable, public TimeoutManager {
72  public:
73   /**
74    * A callback interface to use with runInLoop()
75    *
76    * Derive from this class if you need to delay some code execution until the
77    * next iteration of the event loop.  This allows you to schedule code to be
78    * invoked from the top-level of the loop, after your immediate callers have
79    * returned.
80    *
81    * If a LoopCallback object is destroyed while it is scheduled to be run in
82    * the next loop iteration, it will automatically be cancelled.
83    */
84   class LoopCallback {
85    public:
86     virtual ~LoopCallback() {}
87
88     virtual void runLoopCallback() noexcept = 0;
89     void cancelLoopCallback() {
90       hook_.unlink();
91     }
92
93     bool isLoopCallbackScheduled() const {
94       return hook_.is_linked();
95     }
96
97    private:
98     typedef boost::intrusive::list_member_hook<
99       boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
100
101     ListHook hook_;
102
103     typedef boost::intrusive::list<
104       LoopCallback,
105       boost::intrusive::member_hook<LoopCallback, ListHook,
106                                     &LoopCallback::hook_>,
107       boost::intrusive::constant_time_size<false> > List;
108
109     // EventBase needs access to LoopCallbackList (and therefore to hook_)
110     friend class EventBase;
111     std::shared_ptr<RequestContext> context_;
112   };
113
114   /**
115    * Create a new EventBase object.
116    */
117   EventBase();
118
119   /**
120    * Create a new EventBase object that will use the specified libevent
121    * event_base object to drive the event loop.
122    *
123    * The EventBase will take ownership of this event_base, and will call
124    * event_base_free(evb) when the EventBase is destroyed.
125    */
126   explicit EventBase(event_base* evb);
127   ~EventBase();
128
129   /**
130    * Runs the event loop.
131    *
132    * loop() will loop waiting for I/O or timeouts and invoking EventHandler
133    * and AsyncTimeout callbacks as their events become ready.  loop() will
134    * only return when there are no more events remaining to process, or after
135    * terminateLoopSoon() has been called.
136    *
137    * loop() may be called again to restart event processing after a previous
138    * call to loop() or loopForever() has returned.
139    *
140    * Returns true if the loop completed normally (if it processed all
141    * outstanding requests, or if terminateLoopSoon() was called).  If an error
142    * occurs waiting for events, false will be returned.
143    */
144   bool loop();
145
146   /**
147    * Wait for some events to become active, run them, then return.
148    *
149    * This is useful for callers that want to run the loop manually.
150    *
151    * Returns the same result as loop().
152    */
153   bool loopOnce();
154
155   /**
156    * Runs the event loop.
157    *
158    * loopForever() behaves like loop(), except that it keeps running even if
159    * when there are no more user-supplied EventHandlers or AsyncTimeouts
160    * registered.  It will only return after terminateLoopSoon() has been
161    * called.
162    *
163    * This is useful for callers that want to wait for other threads to call
164    * runInEventBaseThread(), even when there are no other scheduled events.
165    *
166    * loopForever() may be called again to restart event processing after a
167    * previous call to loop() or loopForever() has returned.
168    *
169    * Throws a std::system_error if an error occurs.
170    */
171   void loopForever();
172
173   /**
174    * Causes the event loop to exit soon.
175    *
176    * This will cause an existing call to loop() or loopForever() to stop event
177    * processing and return, even if there are still events remaining to be
178    * processed.
179    *
180    * It is safe to call terminateLoopSoon() from another thread to cause loop()
181    * to wake up and return in the EventBase loop thread.  terminateLoopSoon()
182    * may also be called from the loop thread itself (for example, a
183    * EventHandler or AsyncTimeout callback may call terminateLoopSoon() to
184    * cause the loop to exit after the callback returns.)
185    *
186    * Note that the caller is responsible for ensuring that cleanup of all event
187    * callbacks occurs properly.  Since terminateLoopSoon() causes the loop to
188    * exit even when there are pending events present, there may be remaining
189    * callbacks present waiting to be invoked.  If the loop is later restarted
190    * pending events will continue to be processed normally, however if the
191    * EventBase is destroyed after calling terminateLoopSoon() it is the
192    * caller's responsibility to ensure that cleanup happens properly even if
193    * some outstanding events are never processed.
194    */
195   void terminateLoopSoon();
196
197   /**
198    * Adds the given callback to a queue of things run after the current pass
199    * through the event loop completes.  Note that if this callback calls
200    * runInLoop() the new callback won't be called until the main event loop
201    * has gone through a cycle.
202    *
203    * This method may only be called from the EventBase's thread.  This
204    * essentially allows an event handler to schedule an additional callback to
205    * be invoked after it returns.
206    *
207    * Use runInEventBaseThread() to schedule functions from another thread.
208    *
209    * The thisIteration parameter makes this callback run in this loop
210    * iteration, instead of the next one, even if called from a
211    * runInLoop callback (normal io callbacks that call runInLoop will
212    * always run in this iteration).  This was originally added to
213    * support detachEventBase, as a user callback may have called
214    * terminateLoopSoon(), but we want to make sure we detach.  Also,
215    * detachEventBase almost always must be called from the base event
216    * loop to ensure the stack is unwound, since most users of
217    * EventBase are not thread safe.
218    *
219    * Ideally we would not need thisIteration, and instead just use
220    * runInLoop with loop() (instead of terminateLoopSoon).
221    */
222   void runInLoop(LoopCallback* callback, bool thisIteration = false);
223
224   /**
225    * Convenience function to call runInLoop() with a std::function.
226    *
227    * This creates a LoopCallback object to wrap the std::function, and invoke
228    * the std::function when the loop callback fires.  This is slightly more
229    * expensive than defining your own LoopCallback, but more convenient in
230    * areas that aren't performance sensitive where you just want to use
231    * std::bind.  (std::bind is fairly slow on even by itself.)
232    *
233    * This method may only be called from the EventBase's thread.  This
234    * essentially allows an event handler to schedule an additional callback to
235    * be invoked after it returns.
236    *
237    * Use runInEventBaseThread() to schedule functions from another thread.
238    */
239   void runInLoop(const Cob& c, bool thisIteration = false);
240
241   void runInLoop(Cob&& c, bool thisIteration = false);
242
243   /**
244    * Run the specified function in the EventBase's thread.
245    *
246    * This method is thread-safe, and may be called from another thread.
247    *
248    * If runInEventBaseThread() is called when the EventBase loop is not
249    * running, the function call will be delayed until the next time the loop is
250    * started.
251    *
252    * If runInEventBaseThread() returns true the function has successfully been
253    * scheduled to run in the loop thread.  However, if the loop is terminated
254    * (and never later restarted) before it has a chance to run the requested
255    * function, the function may never be run at all.  The caller is responsible
256    * for handling this situation correctly if they may terminate the loop with
257    * outstanding runInEventBaseThread() calls pending.
258    *
259    * If two calls to runInEventBaseThread() are made from the same thread, the
260    * functions will always be run in the order that they were scheduled.
261    * Ordering between functions scheduled from separate threads is not
262    * guaranteed.
263    *
264    * @param fn  The function to run.  The function must not throw any
265    *     exceptions.
266    * @param arg An argument to pass to the function.
267    *
268    * @return Returns true if the function was successfully scheduled, or false
269    *         if there was an error scheduling the function.
270    */
271   template<typename T>
272   bool runInEventBaseThread(void (*fn)(T*), T* arg) {
273     return runInEventBaseThread(reinterpret_cast<void (*)(void*)>(fn),
274                                 reinterpret_cast<void*>(arg));
275   }
276
277   bool runInEventBaseThread(void (*fn)(void*), void* arg);
278
279   /**
280    * Run the specified function in the EventBase's thread
281    *
282    * This version of runInEventBaseThread() takes a std::function object.
283    * Note that this is less efficient than the version that takes a plain
284    * function pointer and void* argument, as it has to allocate memory to copy
285    * the std::function object.
286    *
287    * If the EventBase loop is terminated before it has a chance to run this
288    * function, the allocated memory will be leaked.  The caller is responsible
289    * for ensuring that the EventBase loop is not terminated before this
290    * function can run.
291    *
292    * The function must not throw any exceptions.
293    */
294   bool runInEventBaseThread(const Cob& fn);
295
296   /**
297    * Runs the given Cob at some time after the specified number of
298    * milliseconds.  (No guarantees exactly when.)
299    *
300    * @return  true iff the cob was successfully registered.
301    */
302   bool runAfterDelay(
303       const Cob& c,
304       int milliseconds,
305       TimeoutManager::InternalEnum = TimeoutManager::InternalEnum::NORMAL);
306
307   /**
308    * Set the maximum desired latency in us and provide a callback which will be
309    * called when that latency is exceeded.
310    */
311   void setMaxLatency(int64_t maxLatency, const Cob& maxLatencyCob) {
312     maxLatency_ = maxLatency;
313     maxLatencyCob_ = maxLatencyCob;
314   }
315
316   /**
317    * Set smoothing coefficient for loop load average; # of milliseconds
318    * for exp(-1) (1/2.71828...) decay.
319    */
320   void setLoadAvgMsec(uint32_t ms);
321
322   /**
323    * reset the load average to a desired value
324    */
325   void resetLoadAvg(double value = 0.0);
326
327   /**
328    * Get the average loop time in microseconds (an exponentially-smoothed ave)
329    */
330   double getAvgLoopTime() const {
331     return avgLoopTime_.get();
332   }
333
334   /**
335     * check if the event base loop is running.
336    */
337   bool isRunning() const {
338     return loopThread_.load(std::memory_order_relaxed) != 0;
339   }
340
341   /**
342    * wait until the event loop starts (after starting the event loop thread).
343    */
344   void waitUntilRunning();
345
346   int getNotificationQueueSize() const;
347
348   /**
349    * Verify that current thread is the EventBase thread, if the EventBase is
350    * running.
351    */
352   bool isInEventBaseThread() const {
353     auto tid = loopThread_.load(std::memory_order_relaxed);
354     return tid == 0 || pthread_equal(tid, pthread_self());
355   }
356
357   bool inRunningEventBaseThread() const {
358     return pthread_equal(
359       loopThread_.load(std::memory_order_relaxed), pthread_self());
360   }
361
362   // --------- interface to underlying libevent base ------------
363   // Avoid using these functions if possible.  These functions are not
364   // guaranteed to always be present if we ever provide alternative EventBase
365   // implementations that do not use libevent internally.
366   event_base* getLibeventBase() const { return evb_; }
367   static const char* getLibeventVersion() { return event_get_version(); }
368   static const char* getLibeventMethod() { return event_get_method(); }
369
370   /**
371    * only EventHandler/AsyncTimeout subclasses and ourselves should
372    * ever call this.
373    *
374    * This is used to mark the beginning of a new loop cycle by the
375    * first handler fired within that cycle.
376    *
377    */
378   bool bumpHandlingTime();
379
380   class SmoothLoopTime {
381    public:
382     explicit SmoothLoopTime(uint64_t timeInterval)
383       : expCoeff_(-1.0/timeInterval)
384       , value_(0.0)
385       , oldBusyLeftover_(0) {
386       VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
387     }
388
389     void setTimeInterval(uint64_t timeInterval);
390     void reset(double value = 0.0);
391
392     void addSample(int64_t idle, int64_t busy);
393
394     double get() const {
395       return value_;
396     }
397
398     void dampen(double factor) {
399       value_ *= factor;
400     }
401
402    private:
403     double  expCoeff_;
404     double  value_;
405     int64_t oldBusyLeftover_;
406   };
407
408   void setObserver(
409     const std::shared_ptr<EventBaseObserver>& observer) {
410     observer_ = observer;
411   }
412
413   const std::shared_ptr<EventBaseObserver>& getObserver() {
414     return observer_;
415   }
416
417   /**
418    * Set the name of the thread that runs this event base.
419    */
420   void setName(const std::string& name);
421
422   /**
423    * Returns the name of the thread that runs this event base.
424    */
425   const std::string& getName();
426
427  private:
428
429   // TimeoutManager
430   void attachTimeoutManager(AsyncTimeout* obj,
431                             TimeoutManager::InternalEnum internal);
432
433   void detachTimeoutManager(AsyncTimeout* obj);
434
435   bool scheduleTimeout(AsyncTimeout* obj, std::chrono::milliseconds timeout);
436
437   void cancelTimeout(AsyncTimeout* obj);
438
439   bool isInTimeoutManagerThread() {
440     return isInEventBaseThread();
441   }
442
443   // Helper class used to short circuit runInEventBaseThread
444   class RunInLoopCallback : public LoopCallback {
445    public:
446     RunInLoopCallback(void (*fn)(void*), void* arg);
447     void runLoopCallback() noexcept;
448
449    private:
450     void (*fn_)(void*);
451     void* arg_;
452   };
453
454   /*
455    * Helper function that tells us whether we have already handled
456    * some event/timeout/callback in this loop iteration.
457    */
458   bool nothingHandledYet();
459
460   // --------- libevent callbacks (not for client use) ------------
461
462   static void runFunctionPtr(std::function<void()>* fn);
463
464   // small object used as a callback arg with enough info to execute the
465   // appropriate client-provided Cob
466   class CobTimeout : public AsyncTimeout {
467    public:
468     CobTimeout(EventBase* b, const Cob& c, TimeoutManager::InternalEnum in)
469         : AsyncTimeout(b, in), cob_(c) {}
470
471     virtual void timeoutExpired() noexcept;
472
473    private:
474     Cob cob_;
475
476    public:
477     typedef boost::intrusive::list_member_hook<
478       boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
479
480     ListHook hook;
481
482     typedef boost::intrusive::list<
483       CobTimeout,
484       boost::intrusive::member_hook<CobTimeout, ListHook, &CobTimeout::hook>,
485       boost::intrusive::constant_time_size<false> > List;
486   };
487
488   typedef LoopCallback::List LoopCallbackList;
489   class FunctionRunner;
490
491   bool loopBody(bool once = false);
492
493   // executes any callbacks queued by runInLoop(); returns false if none found
494   bool runLoopCallbacks(bool setContext = true);
495
496   void initNotificationQueue();
497
498   CobTimeout::List pendingCobTimeouts_;
499
500   LoopCallbackList loopCallbacks_;
501
502   // This will be null most of the time, but point to currentCallbacks
503   // if we are in the middle of running loop callbacks, such that
504   // runInLoop(..., true) will always run in the current loop
505   // iteration.
506   LoopCallbackList* runOnceCallbacks_;
507
508   // stop_ is set by terminateLoopSoon() and is used by the main loop
509   // to determine if it should exit
510   bool stop_;
511
512   // The ID of the thread running the main loop.
513   // 0 if loop is not running.
514   // Note: POSIX doesn't guarantee that 0 is an invalid pthread_t (or
515   // even that atomic<pthread_t> is valid), but that's how it is
516   // everywhere (at least on Linux, FreeBSD, and OSX).
517   std::atomic<pthread_t> loopThread_;
518
519   // pointer to underlying event_base class doing the heavy lifting
520   event_base* evb_;
521
522   // A notification queue for runInEventBaseThread() to use
523   // to send function requests to the EventBase thread.
524   std::unique_ptr<NotificationQueue<std::pair<void (*)(void*), void*>>> queue_;
525   std::unique_ptr<FunctionRunner> fnRunner_;
526
527   // limit for latency in microseconds (0 disables)
528   int64_t maxLatency_;
529
530   // exponentially-smoothed average loop time for latency-limiting
531   SmoothLoopTime avgLoopTime_;
532
533   // smoothed loop time used to invoke latency callbacks; differs from
534   // avgLoopTime_ in that it's scaled down after triggering a callback
535   // to reduce spamminess
536   SmoothLoopTime maxLatencyLoopTime_;
537
538   // callback called when latency limit is exceeded
539   Cob maxLatencyCob_;
540
541   // we'll wait this long before running deferred callbacks if the event
542   // loop is idle.
543   static const int kDEFAULT_IDLE_WAIT_USEC = 20000; // 20ms
544
545   // Wrap-around loop counter to detect beginning of each loop
546   uint64_t nextLoopCnt_;
547   uint64_t latestLoopCnt_;
548   uint64_t startWork_;
549
550   // Observer to export counters
551   std::shared_ptr<EventBaseObserver> observer_;
552   uint32_t observerSampleCount_;
553
554   // Name of the thread running this EventBase
555   std::string name_;
556 };
557
558 } // folly