62c41a1eff3f5c41e5adb8519ab68cbf31e9af1e
[folly.git] / folly / experimental / fibers / FiberManager.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <functional>
19 #include <memory>
20 #include <queue>
21 #include <thread>
22 #include <typeindex>
23 #include <unordered_set>
24 #include <vector>
25
26 #include <folly/AtomicIntrusiveLinkedList.h>
27 #include <folly/Executor.h>
28 #include <folly/Likely.h>
29 #include <folly/IntrusiveList.h>
30 #include <folly/io/async/Request.h>
31 #include <folly/futures/Try.h>
32
33 #include <folly/experimental/ExecutionObserver.h>
34 #include <folly/experimental/fibers/BoostContextCompatibility.h>
35 #include <folly/experimental/fibers/Fiber.h>
36 #include <folly/experimental/fibers/GuardPageAllocator.h>
37 #include <folly/experimental/fibers/TimeoutController.h>
38 #include <folly/experimental/fibers/traits.h>
39
40 namespace folly { namespace fibers {
41
42 class Baton;
43 class Fiber;
44 class LoopController;
45 class TimeoutController;
46
47 template <typename T>
48 class LocalType {
49 };
50
51 class InlineFunctionRunner {
52  public:
53   virtual ~InlineFunctionRunner() {}
54
55   /**
56    * func must be executed inline and only once.
57    */
58   virtual void run(std::function<void()> func) = 0;
59 };
60
61 /**
62  * @class FiberManager
63  * @brief Single-threaded task execution engine.
64  *
65  * FiberManager allows semi-parallel task execution on the same thread. Each
66  * task can notify FiberManager that it is blocked on something (via await())
67  * call. This will pause execution of this task and it will be resumed only
68  * when it is unblocked (via setData()).
69  */
70 class FiberManager : public ::folly::Executor {
71  public:
72   struct Options {
73     static constexpr size_t kDefaultStackSize{16 * 1024};
74
75     /**
76      * Maximum stack size for fibers which will be used for executing all the
77      * tasks.
78      */
79     size_t stackSize{kDefaultStackSize};
80
81     /**
82      * Record exact amount of stack used.
83      *
84      * This is fairly expensive: we fill each newly allocated stack
85      * with some known value and find the boundary of unused stack
86      * with linear search every time we surrender the stack back to fibersPool.
87      * 0 disables stack recording.
88      */
89     size_t recordStackEvery{0};
90
91     /**
92      * Keep at most this many free fibers in the pool.
93      * This way the total number of fibers in the system is always bounded
94      * by the number of active fibers + maxFibersPoolSize.
95      */
96     size_t maxFibersPoolSize{1000};
97
98     /**
99      * Protect limited amount of fiber stacks with guard pages.
100      */
101     bool useGuardPages{false};
102
103     /**
104      * Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs
105      * milliseconds. If value is 0, periodic resizing of the fibers pool is
106      * disabled.
107      */
108     uint32_t fibersPoolResizePeriodMs{0};
109
110     constexpr Options() {}
111   };
112
113   typedef std::function<void(std::exception_ptr, std::string)>
114   ExceptionCallback;
115
116   FiberManager(const FiberManager&) = delete;
117   FiberManager& operator=(const FiberManager&) = delete;
118
119   /**
120    * Initializes, but doesn't start FiberManager loop
121    *
122    * @param loopController
123    * @param options FiberManager options
124    */
125   explicit FiberManager(std::unique_ptr<LoopController> loopController,
126                         Options options = Options());
127
128   /**
129    * Initializes, but doesn't start FiberManager loop
130    *
131    * @param loopController
132    * @param options FiberManager options
133    * @tparam LocalT only local of this type may be stored on fibers.
134    *                Locals of other types will be considered thread-locals.
135    */
136   template <typename LocalT>
137   FiberManager(LocalType<LocalT>,
138                std::unique_ptr<LoopController> loopController,
139                Options options = Options());
140
141
142   ~FiberManager();
143
144   /**
145    * Controller access.
146    */
147   LoopController& loopController();
148   const LoopController& loopController() const;
149
150   /**
151    * Keeps running ready tasks until the list of ready tasks is empty.
152    *
153    * @return True if there are any waiting tasks remaining.
154    */
155   bool loopUntilNoReady();
156
157   /**
158    * @return true if there are outstanding tasks.
159    */
160   bool hasTasks() const;
161
162   /**
163    * Sets exception callback which will be called if any of the tasks throws an
164    * exception.
165    *
166    * @param ec
167    */
168   void setExceptionCallback(ExceptionCallback ec);
169
170   /**
171    * Add a new task to be executed. Must be called from FiberManager's thread.
172    *
173    * @param func Task functor; must have a signature of `void func()`.
174    *             The object will be destroyed once task execution is complete.
175    */
176   template <typename F>
177   void addTask(F&& func);
178
179   /**
180    * Add a new task to be executed. Safe to call from other threads.
181    *
182    * @param func Task function; must have a signature of `void func()`.
183    *             The object will be destroyed once task execution is complete.
184    */
185   template <typename F>
186   void addTaskRemote(F&& func);
187
188   // Executor interface calls addTaskRemote
189   void add(std::function<void()> f) {
190     addTaskRemote(std::move(f));
191   }
192
193   /**
194    * Add a new task. When the task is complete, execute finally(Try<Result>&&)
195    * on the main context.
196    *
197    * @param func Task functor; must have a signature of `T func()` for some T.
198    * @param finally Finally functor; must have a signature of
199    *                `void finally(Try<T>&&)` and will be passed
200    *                the result of func() (including the exception if occurred).
201    */
202   template <typename F, typename G>
203   void addTaskFinally(F&& func, G&& finally);
204
205   /**
206    * If called from a fiber, immediately switches to the FiberManager's context
207    * and runs func(), going back to the Fiber's context after completion.
208    * Outside a fiber, just calls func() directly.
209    *
210    * @return value returned by func().
211    */
212   template <typename F>
213   typename std::result_of<F()>::type
214   runInMainContext(F&& func);
215
216   /**
217    * Returns a refference to a fiber-local context for given Fiber. Should be
218    * always called with the same T for each fiber. Fiber-local context is lazily
219    * default-constructed on first request.
220    * When new task is scheduled via addTask / addTaskRemote from a fiber its
221    * fiber-local context is copied into the new fiber.
222    */
223   template <typename T>
224   T& local();
225
226   template <typename T>
227   static T& localThread();
228
229   /**
230    * @return How many fiber objects (and stacks) has this manager allocated.
231    */
232   size_t fibersAllocated() const;
233
234   /**
235    * @return How many of the allocated fiber objects are currently
236    * in the free pool.
237    */
238   size_t fibersPoolSize() const;
239
240   /**
241    * return     true if running activeFiber_ is not nullptr.
242    */
243   bool hasActiveFiber() const;
244
245   /**
246    * @return The currently running fiber or null if no fiber is executing.
247    */
248   Fiber* currentFiber() const {
249     return currentFiber_;
250   }
251
252   /**
253    * @return What was the most observed fiber stack usage (in bytes).
254    */
255   size_t stackHighWatermark() const;
256
257   /**
258    * Yield execution of the currently running fiber. Must only be called from a
259    * fiber executing on this FiberManager. The calling fiber will be scheduled
260    * when all other fibers have had a chance to run and the event loop is
261    * serviced.
262    */
263   void yield();
264
265   /**
266    * Setup fibers execution observation/instrumentation. Fiber locals are
267    * available to observer.
268    *
269    * @param observer  Fiber's execution observer.
270    */
271   void setObserver(ExecutionObserver* observer);
272
273   /**
274    * Setup fibers preempt runner.
275    */
276   void setPreemptRunner(InlineFunctionRunner* preemptRunner);
277
278   /**
279    * Returns an estimate of the number of fibers which are waiting to run (does
280    * not include fibers or tasks scheduled remotely).
281    */
282   size_t runQueueSize() const {
283     return readyFibers_.size() + yieldedFibers_.size();
284   }
285
286   static FiberManager& getFiberManager();
287   static FiberManager* getFiberManagerUnsafe();
288
289  private:
290   friend class Baton;
291   friend class Fiber;
292   template <typename F>
293   struct AddTaskHelper;
294   template <typename F, typename G>
295   struct AddTaskFinallyHelper;
296
297   struct RemoteTask {
298     template <typename F>
299     explicit RemoteTask(F&& f) :
300         func(std::forward<F>(f)),
301         rcontext(RequestContext::saveContext()) {}
302     template <typename F>
303     RemoteTask(F&& f, const Fiber::LocalData& localData_) :
304         func(std::forward<F>(f)),
305         localData(folly::make_unique<Fiber::LocalData>(localData_)),
306         rcontext(RequestContext::saveContext()) {}
307     std::function<void()> func;
308     std::unique_ptr<Fiber::LocalData> localData;
309     std::shared_ptr<RequestContext> rcontext;
310     AtomicIntrusiveLinkedListHook<RemoteTask> nextRemoteTask;
311   };
312
313   typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
314
315   Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
316   /**
317    * Same as active fiber, but also set for functions run from fiber on main
318    * context.
319    */
320   Fiber* currentFiber_{nullptr};
321
322   FiberTailQueue readyFibers_;  /**< queue of fibers ready to be executed */
323   FiberTailQueue yieldedFibers_;  /**< queue of fibers which have yielded
324                                        execution */
325   FiberTailQueue fibersPool_;   /**< pool of unitialized Fiber objects */
326
327   size_t fibersAllocated_{0};   /**< total number of fibers allocated */
328   size_t fibersPoolSize_{0};    /**< total number of fibers in the free pool */
329   size_t fibersActive_{0};      /**< number of running or blocked fibers */
330   size_t fiberId_{0};           /**< id of last fiber used */
331
332   /**
333    * Maximum number of active fibers in the last period lasting
334    * Options::fibersPoolResizePeriod milliseconds.
335    */
336   size_t maxFibersActiveLastPeriod_{0};
337
338   FContext::ContextStruct mainContext_;  /**< stores loop function context */
339
340   std::unique_ptr<LoopController> loopController_;
341   bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */
342
343   /**
344    * When we are inside FiberManager loop this points to FiberManager. Otherwise
345    * it's nullptr
346    */
347   static FOLLY_TLS FiberManager* currentFiberManager_;
348
349   /**
350    * runInMainContext implementation for non-void functions.
351    */
352   template <typename F>
353   typename std::enable_if<
354     !std::is_same<typename std::result_of<F()>::type, void>::value,
355     typename std::result_of<F()>::type>::type
356   runInMainContextHelper(F&& func);
357
358   /**
359    * runInMainContext implementation for void functions
360    */
361   template <typename F>
362   typename std::enable_if<
363     std::is_same<typename std::result_of<F()>::type, void>::value,
364     void>::type
365   runInMainContextHelper(F&& func);
366
367   /**
368    * Allocator used to allocate stack for Fibers in the pool.
369    * Allocates stack on the stack of the main context.
370    */
371   GuardPageAllocator stackAllocator_;
372
373   const Options options_;       /**< FiberManager options */
374
375   /**
376    * Largest observed individual Fiber stack usage in bytes.
377    */
378   size_t stackHighWatermark_{0};
379
380   /**
381    * Schedules a loop with loopController (unless already scheduled before).
382    */
383   void ensureLoopScheduled();
384
385   /**
386    * @return An initialized Fiber object from the pool
387    */
388   Fiber* getFiber();
389
390   /**
391    * Sets local data for given fiber if all conditions are met.
392    */
393   void initLocalData(Fiber& fiber);
394
395   /**
396    * Function passed to the await call.
397    */
398   std::function<void(Fiber&)> awaitFunc_;
399
400   /**
401    * Function passed to the runInMainContext call.
402    */
403   std::function<void()> immediateFunc_;
404
405   /**
406    * Preempt runner.
407    */
408   InlineFunctionRunner* preemptRunner_{nullptr};
409
410   /**
411    * Fiber's execution observer.
412    */
413   ExecutionObserver* observer_{nullptr};
414
415   ExceptionCallback exceptionCallback_; /**< task exception callback */
416
417   folly::AtomicIntrusiveLinkedList<Fiber, &Fiber::nextRemoteReady_>
418       remoteReadyQueue_;
419
420   folly::AtomicIntrusiveLinkedList<RemoteTask, &RemoteTask::nextRemoteTask>
421       remoteTaskQueue_;
422
423   std::shared_ptr<TimeoutController> timeoutManager_;
424
425   struct FibersPoolResizer {
426     explicit FibersPoolResizer(FiberManager& fm) :
427       fiberManager_(fm) {}
428     void operator()();
429    private:
430     FiberManager& fiberManager_;
431   };
432
433   FibersPoolResizer fibersPoolResizer_;
434   bool fibersPoolResizerScheduled_{false};
435
436   void doFibersPoolResizing();
437
438   /**
439    * Only local of this type will be available for fibers.
440    */
441   std::type_index localType_;
442
443   void runReadyFiber(Fiber* fiber);
444   void remoteReadyInsert(Fiber* fiber);
445 };
446
447 /**
448  * @return      true iff we are running in a fiber's context
449  */
450 inline bool onFiber() {
451   auto fm = FiberManager::getFiberManagerUnsafe();
452   return fm ? fm->hasActiveFiber() : false;
453 }
454
455 /**
456  * Add a new task to be executed.
457  *
458  * @param func Task functor; must have a signature of `void func()`.
459  *             The object will be destroyed once task execution is complete.
460  */
461 template <typename F>
462 inline void addTask(F&& func) {
463   return FiberManager::getFiberManager().addTask(std::forward<F>(func));
464 }
465
466 /**
467  * Add a new task. When the task is complete, execute finally(Try<Result>&&)
468  * on the main context.
469  * Task functor is run and destroyed on the fiber context.
470  * Finally functor is run and destroyed on the main context.
471  *
472  * @param func Task functor; must have a signature of `T func()` for some T.
473  * @param finally Finally functor; must have a signature of
474  *                `void finally(Try<T>&&)` and will be passed
475  *                the result of func() (including the exception if occurred).
476  */
477 template <typename F, typename G>
478 inline void addTaskFinally(F&& func, G&& finally) {
479   return FiberManager::getFiberManager().addTaskFinally(
480     std::forward<F>(func), std::forward<G>(finally));
481 }
482
483 /**
484  * Blocks task execution until given promise is fulfilled.
485  *
486  * Calls function passing in a Promise<T>, which has to be fulfilled.
487  *
488  * @return data which was used to fulfill the promise.
489  */
490 template <typename F>
491 typename FirstArgOf<F>::type::value_type
492 inline await(F&& func);
493
494 /**
495  * If called from a fiber, immediately switches to the FiberManager's context
496  * and runs func(), going back to the Fiber's context after completion.
497  * Outside a fiber, just calls func() directly.
498  *
499  * @return value returned by func().
500  */
501 template <typename F>
502 typename std::result_of<F()>::type
503 inline runInMainContext(F&& func) {
504   auto fm = FiberManager::getFiberManagerUnsafe();
505   if (UNLIKELY(fm == nullptr)) {
506     return func();
507   }
508   return fm->runInMainContext(std::forward<F>(func));
509 }
510
511 /**
512  * Returns a refference to a fiber-local context for given Fiber. Should be
513  * always called with the same T for each fiber. Fiber-local context is lazily
514  * default-constructed on first request.
515  * When new task is scheduled via addTask / addTaskRemote from a fiber its
516  * fiber-local context is copied into the new fiber.
517  */
518 template <typename T>
519 T& local() {
520   auto fm = FiberManager::getFiberManagerUnsafe();
521   if (fm) {
522     return fm->local<T>();
523   }
524   return FiberManager::localThread<T>();
525 }
526
527 inline void yield() {
528   auto fm = FiberManager::getFiberManagerUnsafe();
529   if (fm) {
530     fm->yield();
531   } else {
532     std::this_thread::yield();
533   }
534 }
535
536 }}
537
538 #include "FiberManager-inl.h"