Add a default timeout parameter to HHWheelTimer.
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Try.h>
33
34 namespace folly { namespace fibers {
35
36 namespace {
37
38 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
39 #ifdef FOLLY_SANITIZE_ADDRESS
40   /* ASAN needs a lot of extra stack space.
41      16x is a conservative estimate, 8x also worked with tests
42      where it mattered.  Note that overallocating here does not necessarily
43      increase RSS, since unused memory is pretty much free. */
44   opts.stackSize *= 16;
45 #endif
46   return opts;
47 }
48
49 }  // anonymous
50
51 inline void FiberManager::ensureLoopScheduled() {
52   if (isLoopScheduled_) {
53     return;
54   }
55
56   isLoopScheduled_ = true;
57   loopController_->schedule();
58 }
59
60 inline void FiberManager::runReadyFiber(Fiber* fiber) {
61   SCOPE_EXIT {
62     assert(currentFiber_ == nullptr);
63     assert(activeFiber_ == nullptr);
64   };
65
66   assert(fiber->state_ == Fiber::NOT_STARTED ||
67          fiber->state_ == Fiber::READY_TO_RUN);
68   currentFiber_ = fiber;
69   fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
70   if (observer_) {
71     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
72   }
73
74   while (fiber->state_ == Fiber::NOT_STARTED ||
75          fiber->state_ == Fiber::READY_TO_RUN) {
76     activeFiber_ = fiber;
77     jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
78     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
79       try {
80         immediateFunc_();
81       } catch (...) {
82         exceptionCallback_(std::current_exception(), "running immediateFunc_");
83       }
84       immediateFunc_ = nullptr;
85       fiber->state_ = Fiber::READY_TO_RUN;
86     }
87   }
88
89   if (fiber->state_ == Fiber::AWAITING) {
90     awaitFunc_(*fiber);
91     awaitFunc_ = nullptr;
92     if (observer_) {
93       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
94     }
95     currentFiber_ = nullptr;
96     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
97   } else if (fiber->state_ == Fiber::INVALID) {
98     assert(fibersActive_ > 0);
99     --fibersActive_;
100     // Making sure that task functor is deleted once task is complete.
101     // NOTE: we must do it on main context, as the fiber is not
102     // running at this point.
103     fiber->func_ = nullptr;
104     fiber->resultFunc_ = nullptr;
105     if (fiber->finallyFunc_) {
106       try {
107         fiber->finallyFunc_();
108       } catch (...) {
109         exceptionCallback_(std::current_exception(), "running finallyFunc_");
110       }
111       fiber->finallyFunc_ = nullptr;
112     }
113     // Make sure LocalData is not accessible from its destructor
114     if (observer_) {
115       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
116     }
117     currentFiber_ = nullptr;
118     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
119     fiber->localData_.reset();
120     fiber->rcontext_.reset();
121
122     if (fibersPoolSize_ < options_.maxFibersPoolSize) {
123       fibersPool_.push_front(*fiber);
124       ++fibersPoolSize_;
125     } else {
126       delete fiber;
127       assert(fibersAllocated_ > 0);
128       --fibersAllocated_;
129     }
130   } else if (fiber->state_ == Fiber::YIELDED) {
131     if (observer_) {
132       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
133     }
134     currentFiber_ = nullptr;
135     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
136     fiber->state_ = Fiber::READY_TO_RUN;
137     yieldedFibers_.push_back(*fiber);
138   }
139 }
140
141 inline bool FiberManager::loopUntilNoReady() {
142   SCOPE_EXIT {
143     isLoopScheduled_ = false;
144     if (!readyFibers_.empty()) {
145       ensureLoopScheduled();
146     }
147     currentFiberManager_ = nullptr;
148   };
149
150   currentFiberManager_ = this;
151
152   bool hadRemoteFiber = true;
153   while (hadRemoteFiber) {
154     hadRemoteFiber = false;
155
156     while (!readyFibers_.empty()) {
157       auto& fiber = readyFibers_.front();
158       readyFibers_.pop_front();
159       runReadyFiber(&fiber);
160     }
161
162     remoteReadyQueue_.sweep(
163       [this, &hadRemoteFiber] (Fiber* fiber) {
164         runReadyFiber(fiber);
165         hadRemoteFiber = true;
166       }
167     );
168
169     remoteTaskQueue_.sweep(
170       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
171         std::unique_ptr<RemoteTask> task(taskPtr);
172         auto fiber = getFiber();
173         if (task->localData) {
174           fiber->localData_ = *task->localData;
175         }
176         fiber->rcontext_ = std::move(task->rcontext);
177
178         fiber->setFunction(std::move(task->func));
179         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
180         if (observer_) {
181           observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
182         }
183         runReadyFiber(fiber);
184         hadRemoteFiber = true;
185       }
186     );
187   }
188
189   if (observer_) {
190     for (auto& yielded : yieldedFibers_) {
191       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
192     }
193   }
194   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
195
196   return fibersActive_ > 0;
197 }
198
199 // We need this to be in a struct, not inlined in addTask, because clang crashes
200 // otherwise.
201 template <typename F>
202 struct FiberManager::AddTaskHelper {
203   class Func;
204
205   static constexpr bool allocateInBuffer =
206     sizeof(Func) <= Fiber::kUserBufferSize;
207
208   class Func {
209    public:
210     Func(F&& func, FiberManager& fm) :
211         func_(std::forward<F>(func)), fm_(fm) {}
212
213     void operator()() {
214       try {
215         func_();
216       } catch (...) {
217         fm_.exceptionCallback_(std::current_exception(),
218                                "running Func functor");
219       }
220       if (allocateInBuffer) {
221         this->~Func();
222       } else {
223         delete this;
224       }
225     }
226
227    private:
228     F func_;
229     FiberManager& fm_;
230   };
231 };
232
233 template <typename F>
234 void FiberManager::addTask(F&& func) {
235   typedef AddTaskHelper<F> Helper;
236
237   auto fiber = getFiber();
238   initLocalData(*fiber);
239
240   if (Helper::allocateInBuffer) {
241     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
242     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
243
244     fiber->setFunction(std::ref(*funcLoc));
245   } else {
246     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
247
248     fiber->setFunction(std::ref(*funcLoc));
249   }
250
251   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
252   readyFibers_.push_back(*fiber);
253   if (observer_) {
254     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
255   }
256
257   ensureLoopScheduled();
258 }
259
260 template <typename F>
261 void FiberManager::addTaskRemote(F&& func) {
262   auto task = [&]() {
263     auto currentFm = getFiberManagerUnsafe();
264     if (currentFm &&
265         currentFm->currentFiber_ &&
266         currentFm->localType_ == localType_) {
267       return folly::make_unique<RemoteTask>(
268         std::forward<F>(func),
269         currentFm->currentFiber_->localData_);
270     }
271     return folly::make_unique<RemoteTask>(std::forward<F>(func));
272   }();
273   if (remoteTaskQueue_.insertHead(task.release())) {
274     loopController_->scheduleThreadSafe();
275   }
276 }
277
278 template <typename X>
279 struct IsRvalueRefTry { static const bool value = false; };
280 template <typename T>
281 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
282
283 // We need this to be in a struct, not inlined in addTaskFinally, because clang
284 // crashes otherwise.
285 template <typename F, typename G>
286 struct FiberManager::AddTaskFinallyHelper {
287   class Func;
288   class Finally;
289
290   typedef typename std::result_of<F()>::type Result;
291
292   static constexpr bool allocateInBuffer =
293     sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
294
295   class Finally {
296    public:
297     Finally(G&& finally,
298             FiberManager& fm) :
299         finally_(std::move(finally)),
300         fm_(fm) {
301     }
302
303     void operator()() {
304       try {
305         finally_(std::move(*result_));
306       } catch (...) {
307         fm_.exceptionCallback_(std::current_exception(),
308                                "running Finally functor");
309       }
310
311       if (allocateInBuffer) {
312         this->~Finally();
313       } else {
314         delete this;
315       }
316     }
317
318    private:
319     friend class Func;
320
321     G finally_;
322     folly::Optional<folly::Try<Result>> result_;
323     FiberManager& fm_;
324   };
325
326   class Func {
327    public:
328     Func(F&& func, Finally& finally) :
329         func_(std::move(func)), result_(finally.result_) {}
330
331     void operator()() {
332       result_ = folly::makeTryWith(std::move(func_));
333
334       if (allocateInBuffer) {
335         this->~Func();
336       } else {
337         delete this;
338       }
339     }
340
341    private:
342     F func_;
343     folly::Optional<folly::Try<Result>>& result_;
344   };
345 };
346
347 template <typename F, typename G>
348 void FiberManager::addTaskFinally(F&& func, G&& finally) {
349   typedef typename std::result_of<F()>::type Result;
350
351   static_assert(
352     IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
353     "finally(arg): arg must be Try<T>&&");
354   static_assert(
355     std::is_convertible<
356       Result,
357       typename std::remove_reference<
358         typename FirstArgOf<G>::type
359       >::type::element_type
360     >::value,
361     "finally(Try<T>&&): T must be convertible from func()'s return type");
362
363   auto fiber = getFiber();
364   initLocalData(*fiber);
365
366   typedef AddTaskFinallyHelper<F,G> Helper;
367
368   if (Helper::allocateInBuffer) {
369     auto funcLoc = static_cast<typename Helper::Func*>(
370       fiber->getUserBuffer());
371     auto finallyLoc = static_cast<typename Helper::Finally*>(
372       static_cast<void*>(funcLoc + 1));
373
374     new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
375     new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
376
377     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
378   } else {
379     auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
380     auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
381
382     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
383   }
384
385   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
386   readyFibers_.push_back(*fiber);
387   if (observer_) {
388     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
389   }
390
391   ensureLoopScheduled();
392 }
393
394 template <typename F>
395 typename std::result_of<F()>::type
396 FiberManager::runInMainContext(F&& func) {
397   return runInMainContextHelper(std::forward<F>(func));
398 }
399
400 template <typename F>
401 inline typename std::enable_if<
402   !std::is_same<typename std::result_of<F()>::type, void>::value,
403   typename std::result_of<F()>::type>::type
404 FiberManager::runInMainContextHelper(F&& func) {
405   if (UNLIKELY(activeFiber_ == nullptr)) {
406     return func();
407   }
408
409   typedef typename std::result_of<F()>::type Result;
410
411   folly::Try<Result> result;
412   auto f = [&func, &result]() mutable {
413     result = folly::makeTryWith(std::forward<F>(func));
414   };
415
416   immediateFunc_ = std::ref(f);
417   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
418
419   return std::move(result.value());
420 }
421
422 template <typename F>
423 inline typename std::enable_if<
424   std::is_same<typename std::result_of<F()>::type, void>::value,
425   void>::type
426 FiberManager::runInMainContextHelper(F&& func) {
427   if (UNLIKELY(activeFiber_ == nullptr)) {
428     func();
429     return;
430   }
431
432   immediateFunc_ = std::ref(func);
433   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
434 }
435
436 inline FiberManager& FiberManager::getFiberManager() {
437   assert(currentFiberManager_ != nullptr);
438   return *currentFiberManager_;
439 }
440
441 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
442   return currentFiberManager_;
443 }
444
445 inline bool FiberManager::hasActiveFiber() const {
446   return activeFiber_ != nullptr;
447 }
448
449 inline void FiberManager::yield() {
450   assert(currentFiberManager_ == this);
451   assert(activeFiber_ != nullptr);
452   assert(activeFiber_->state_ == Fiber::RUNNING);
453   activeFiber_->preempt(Fiber::YIELDED);
454 }
455
456 template <typename T>
457 T& FiberManager::local() {
458   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
459     return currentFiber_->localData_.get<T>();
460   }
461   return localThread<T>();
462 }
463
464 template <typename T>
465 T& FiberManager::localThread() {
466 #ifndef __APPLE__
467   static thread_local T t;
468   return t;
469 #else // osx doesn't support thread_local
470   static ThreadLocal<T> t;
471   return *t;
472 #endif
473 }
474
475 inline void FiberManager::initLocalData(Fiber& fiber) {
476   auto fm = getFiberManagerUnsafe();
477   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
478     fiber.localData_ = fm->currentFiber_->localData_;
479   }
480   fiber.rcontext_ = RequestContext::saveContext();
481 }
482
483 template <typename LocalT>
484 FiberManager::FiberManager(
485   LocalType<LocalT>,
486   std::unique_ptr<LoopController> loopController__,
487   Options options)  :
488     loopController_(std::move(loopController__)),
489     stackAllocator_(options.useGuardPages),
490     options_(preprocessOptions(std::move(options))),
491     exceptionCallback_([](std::exception_ptr eptr, std::string context) {
492         try {
493           std::rethrow_exception(eptr);
494         } catch (const std::exception& e) {
495           LOG(DFATAL) << "Exception " << typeid(e).name()
496                       << " with message '" << e.what() << "' was thrown in "
497                       << "FiberManager with context '" << context << "'";
498           throw;
499         } catch (...) {
500           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
501                       << "context '" << context << "'";
502           throw;
503         }
504       }),
505     timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
506     localType_(typeid(LocalT)) {
507   loopController_->setFiberManager(this);
508 }
509
510 template <typename F>
511 typename FirstArgOf<F>::type::value_type
512 inline await(F&& func) {
513   typedef typename FirstArgOf<F>::type::value_type Result;
514
515   folly::Try<Result> result;
516
517   Baton baton;
518   baton.wait([&func, &result, &baton]() mutable {
519       func(Promise<Result>(result, baton));
520     });
521
522   return folly::moveFromTry(std::move(result));
523 }
524
525 }}