Simplify the API exposed by BoostContextCompatibility
[folly.git] / folly / fibers / FiberManagerInternal-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/fibers/Baton.h>
29 #include <folly/fibers/Fiber.h>
30 #include <folly/fibers/LoopController.h>
31 #include <folly/fibers/Promise.h>
32 #include <folly/Try.h>
33
34 namespace folly {
35 namespace fibers {
36
37 namespace {
38
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #ifdef FOLLY_SANITIZE_ADDRESS
41   /* ASAN needs a lot of extra stack space.
42      16x is a conservative estimate, 8x also worked with tests
43      where it mattered.  Note that overallocating here does not necessarily
44      increase RSS, since unused memory is pretty much free. */
45   opts.stackSize *= 16;
46 #endif
47   return opts;
48 }
49
50 } // anonymous
51
52 inline void FiberManager::ensureLoopScheduled() {
53   if (isLoopScheduled_) {
54     return;
55   }
56
57   isLoopScheduled_ = true;
58   loopController_->schedule();
59 }
60
61 inline void FiberManager::activateFiber(Fiber* fiber) {
62   DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
63
64 #ifdef FOLLY_SANITIZE_ADDRESS
65   DCHECK(!fiber->asanMainStackBase_);
66   DCHECK(!fiber->asanMainStackSize_);
67   auto stack = fiber->getStack();
68   void* asanFakeStack;
69   registerStartSwitchStackWithAsan(&asanFakeStack, stack.first, stack.second);
70   SCOPE_EXIT {
71     registerFinishSwitchStackWithAsan(asanFakeStack, nullptr, nullptr);
72     fiber->asanMainStackBase_ = nullptr;
73     fiber->asanMainStackSize_ = 0;
74   };
75 #endif
76
77   activeFiber_ = fiber;
78   fiber->fiberImpl_.activate();
79 }
80
81 inline void FiberManager::deactivateFiber(Fiber* fiber) {
82   DCHECK_EQ(activeFiber_, fiber);
83
84 #ifdef FOLLY_SANITIZE_ADDRESS
85   DCHECK(fiber->asanMainStackBase_);
86   DCHECK(fiber->asanMainStackSize_);
87
88   // Release fake stack if fiber is completed
89   auto saveFakeStackPtr =
90       fiber->state_ == Fiber::INVALID ? nullptr : &fiber->asanFakeStack_;
91   registerStartSwitchStackWithAsan(
92       saveFakeStackPtr, fiber->asanMainStackBase_, fiber->asanMainStackSize_);
93   SCOPE_EXIT {
94     registerFinishSwitchStackWithAsan(
95         fiber->asanFakeStack_,
96         &fiber->asanMainStackBase_,
97         &fiber->asanMainStackSize_);
98     fiber->asanFakeStack_ = nullptr;
99   };
100 #endif
101
102   activeFiber_ = nullptr;
103   fiber->fiberImpl_.deactivate();
104 }
105
106 inline void FiberManager::runReadyFiber(Fiber* fiber) {
107   SCOPE_EXIT {
108     assert(currentFiber_ == nullptr);
109     assert(activeFiber_ == nullptr);
110   };
111
112   assert(
113       fiber->state_ == Fiber::NOT_STARTED ||
114       fiber->state_ == Fiber::READY_TO_RUN);
115   currentFiber_ = fiber;
116   fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
117   if (observer_) {
118     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
119   }
120
121   while (fiber->state_ == Fiber::NOT_STARTED ||
122          fiber->state_ == Fiber::READY_TO_RUN) {
123     activateFiber(fiber);
124     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
125       try {
126         immediateFunc_();
127       } catch (...) {
128         exceptionCallback_(std::current_exception(), "running immediateFunc_");
129       }
130       immediateFunc_ = nullptr;
131       fiber->state_ = Fiber::READY_TO_RUN;
132     }
133   }
134
135   if (fiber->state_ == Fiber::AWAITING) {
136     awaitFunc_(*fiber);
137     awaitFunc_ = nullptr;
138     if (observer_) {
139       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
140     }
141     currentFiber_ = nullptr;
142     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
143   } else if (fiber->state_ == Fiber::INVALID) {
144     assert(fibersActive_ > 0);
145     --fibersActive_;
146     // Making sure that task functor is deleted once task is complete.
147     // NOTE: we must do it on main context, as the fiber is not
148     // running at this point.
149     fiber->func_ = nullptr;
150     fiber->resultFunc_ = nullptr;
151     if (fiber->finallyFunc_) {
152       try {
153         fiber->finallyFunc_();
154       } catch (...) {
155         exceptionCallback_(std::current_exception(), "running finallyFunc_");
156       }
157       fiber->finallyFunc_ = nullptr;
158     }
159     // Make sure LocalData is not accessible from its destructor
160     if (observer_) {
161       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
162     }
163     currentFiber_ = nullptr;
164     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
165     fiber->localData_.reset();
166     fiber->rcontext_.reset();
167
168     if (fibersPoolSize_ < options_.maxFibersPoolSize ||
169         options_.fibersPoolResizePeriodMs > 0) {
170       fibersPool_.push_front(*fiber);
171       ++fibersPoolSize_;
172     } else {
173       delete fiber;
174       assert(fibersAllocated_ > 0);
175       --fibersAllocated_;
176     }
177   } else if (fiber->state_ == Fiber::YIELDED) {
178     if (observer_) {
179       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
180     }
181     currentFiber_ = nullptr;
182     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
183     fiber->state_ = Fiber::READY_TO_RUN;
184     yieldedFibers_.push_back(*fiber);
185   }
186 }
187
188 inline bool FiberManager::loopUntilNoReady() {
189 #ifndef _WIN32
190   if (UNLIKELY(!alternateSignalStackRegistered_)) {
191     registerAlternateSignalStack();
192   }
193 #endif
194
195   // Support nested FiberManagers
196   auto originalFiberManager = this;
197   std::swap(currentFiberManager_, originalFiberManager);
198
199   SCOPE_EXIT {
200     isLoopScheduled_ = false;
201     if (!readyFibers_.empty()) {
202       ensureLoopScheduled();
203     }
204     std::swap(currentFiberManager_, originalFiberManager);
205     CHECK_EQ(this, originalFiberManager);
206   };
207
208   bool hadRemoteFiber = true;
209   while (hadRemoteFiber) {
210     hadRemoteFiber = false;
211
212     while (!readyFibers_.empty()) {
213       auto& fiber = readyFibers_.front();
214       readyFibers_.pop_front();
215       runReadyFiber(&fiber);
216     }
217
218     remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
219       runReadyFiber(fiber);
220       hadRemoteFiber = true;
221     });
222
223     remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
224       std::unique_ptr<RemoteTask> task(taskPtr);
225       auto fiber = getFiber();
226       if (task->localData) {
227         fiber->localData_ = *task->localData;
228       }
229       fiber->rcontext_ = std::move(task->rcontext);
230
231       fiber->setFunction(std::move(task->func));
232       if (observer_) {
233         observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
234       }
235       runReadyFiber(fiber);
236       hadRemoteFiber = true;
237     });
238   }
239
240   if (observer_) {
241     for (auto& yielded : yieldedFibers_) {
242       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
243     }
244   }
245   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
246
247   return fibersActive_ > 0;
248 }
249
250 // We need this to be in a struct, not inlined in addTask, because clang crashes
251 // otherwise.
252 template <typename F>
253 struct FiberManager::AddTaskHelper {
254   class Func;
255
256   static constexpr bool allocateInBuffer =
257       sizeof(Func) <= Fiber::kUserBufferSize;
258
259   class Func {
260    public:
261     Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
262
263     void operator()() {
264       try {
265         func_();
266       } catch (...) {
267         fm_.exceptionCallback_(
268             std::current_exception(), "running Func functor");
269       }
270       if (allocateInBuffer) {
271         this->~Func();
272       } else {
273         delete this;
274       }
275     }
276
277    private:
278     F func_;
279     FiberManager& fm_;
280   };
281 };
282
283 template <typename F>
284 void FiberManager::addTask(F&& func) {
285   typedef AddTaskHelper<F> Helper;
286
287   auto fiber = getFiber();
288   initLocalData(*fiber);
289
290   if (Helper::allocateInBuffer) {
291     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
292     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
293
294     fiber->setFunction(std::ref(*funcLoc));
295   } else {
296     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
297
298     fiber->setFunction(std::ref(*funcLoc));
299   }
300
301   readyFibers_.push_back(*fiber);
302   if (observer_) {
303     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
304   }
305
306   ensureLoopScheduled();
307 }
308
309 template <typename F>
310 void FiberManager::addTaskRemote(F&& func) {
311   auto task = [&]() {
312     auto currentFm = getFiberManagerUnsafe();
313     if (currentFm && currentFm->currentFiber_ &&
314         currentFm->localType_ == localType_) {
315       return folly::make_unique<RemoteTask>(
316           std::forward<F>(func), currentFm->currentFiber_->localData_);
317     }
318     return folly::make_unique<RemoteTask>(std::forward<F>(func));
319   }();
320   auto insertHead = [&]() {
321     return remoteTaskQueue_.insertHead(task.release());
322   };
323   loopController_->scheduleThreadSafe(std::ref(insertHead));
324 }
325
326 template <typename X>
327 struct IsRvalueRefTry {
328   static const bool value = false;
329 };
330 template <typename T>
331 struct IsRvalueRefTry<folly::Try<T>&&> {
332   static const bool value = true;
333 };
334
335 // We need this to be in a struct, not inlined in addTaskFinally, because clang
336 // crashes otherwise.
337 template <typename F, typename G>
338 struct FiberManager::AddTaskFinallyHelper {
339   class Func;
340
341   typedef typename std::result_of<F()>::type Result;
342
343   class Finally {
344    public:
345     Finally(G finally, FiberManager& fm)
346         : finally_(std::move(finally)), fm_(fm) {}
347
348     void operator()() {
349       try {
350         finally_(std::move(*result_));
351       } catch (...) {
352         fm_.exceptionCallback_(
353             std::current_exception(), "running Finally functor");
354       }
355
356       if (allocateInBuffer) {
357         this->~Finally();
358       } else {
359         delete this;
360       }
361     }
362
363    private:
364     friend class Func;
365
366     G finally_;
367     folly::Optional<folly::Try<Result>> result_;
368     FiberManager& fm_;
369   };
370
371   class Func {
372    public:
373     Func(F func, Finally& finally)
374         : func_(std::move(func)), result_(finally.result_) {}
375
376     void operator()() {
377       result_ = folly::makeTryWith(std::move(func_));
378
379       if (allocateInBuffer) {
380         this->~Func();
381       } else {
382         delete this;
383       }
384     }
385
386    private:
387     F func_;
388     folly::Optional<folly::Try<Result>>& result_;
389   };
390
391   static constexpr bool allocateInBuffer =
392       sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
393 };
394
395 template <typename F, typename G>
396 void FiberManager::addTaskFinally(F&& func, G&& finally) {
397   typedef typename std::result_of<F()>::type Result;
398
399   static_assert(
400       IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
401       "finally(arg): arg must be Try<T>&&");
402   static_assert(
403       std::is_convertible<
404           Result,
405           typename std::remove_reference<
406               typename FirstArgOf<G>::type>::type::element_type>::value,
407       "finally(Try<T>&&): T must be convertible from func()'s return type");
408
409   auto fiber = getFiber();
410   initLocalData(*fiber);
411
412   typedef AddTaskFinallyHelper<
413       typename std::decay<F>::type,
414       typename std::decay<G>::type>
415       Helper;
416
417   if (Helper::allocateInBuffer) {
418     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
419     auto finallyLoc =
420         static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
421
422     new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
423     new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
424
425     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
426   } else {
427     auto finallyLoc =
428         new typename Helper::Finally(std::forward<G>(finally), *this);
429     auto funcLoc =
430         new typename Helper::Func(std::forward<F>(func), *finallyLoc);
431
432     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
433   }
434
435   readyFibers_.push_back(*fiber);
436   if (observer_) {
437     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
438   }
439
440   ensureLoopScheduled();
441 }
442
443 template <typename F>
444 typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
445   if (UNLIKELY(activeFiber_ == nullptr)) {
446     return func();
447   }
448
449   typedef typename std::result_of<F()>::type Result;
450
451   folly::Try<Result> result;
452   auto f = [&func, &result]() mutable {
453     result = folly::makeTryWith(std::forward<F>(func));
454   };
455
456   immediateFunc_ = std::ref(f);
457   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
458
459   return std::move(result).value();
460 }
461
462 inline FiberManager& FiberManager::getFiberManager() {
463   assert(currentFiberManager_ != nullptr);
464   return *currentFiberManager_;
465 }
466
467 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
468   return currentFiberManager_;
469 }
470
471 inline bool FiberManager::hasActiveFiber() const {
472   return activeFiber_ != nullptr;
473 }
474
475 inline void FiberManager::yield() {
476   assert(currentFiberManager_ == this);
477   assert(activeFiber_ != nullptr);
478   assert(activeFiber_->state_ == Fiber::RUNNING);
479   activeFiber_->preempt(Fiber::YIELDED);
480 }
481
482 template <typename T>
483 T& FiberManager::local() {
484   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
485     return currentFiber_->localData_.get<T>();
486   }
487   return localThread<T>();
488 }
489
490 template <typename T>
491 T& FiberManager::localThread() {
492 #ifndef __APPLE__
493   static thread_local T t;
494   return t;
495 #else // osx doesn't support thread_local
496   static ThreadLocal<T> t;
497   return *t;
498 #endif
499 }
500
501 inline void FiberManager::initLocalData(Fiber& fiber) {
502   auto fm = getFiberManagerUnsafe();
503   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
504     fiber.localData_ = fm->currentFiber_->localData_;
505   }
506   fiber.rcontext_ = RequestContext::saveContext();
507 }
508
509 template <typename LocalT>
510 FiberManager::FiberManager(
511     LocalType<LocalT>,
512     std::unique_ptr<LoopController> loopController__,
513     Options options)
514     : loopController_(std::move(loopController__)),
515       stackAllocator_(options.useGuardPages),
516       options_(preprocessOptions(std::move(options))),
517       exceptionCallback_([](std::exception_ptr eptr, std::string context) {
518         try {
519           std::rethrow_exception(eptr);
520         } catch (const std::exception& e) {
521           LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
522                       << e.what() << "' was thrown in "
523                       << "FiberManager with context '" << context << "'";
524         } catch (...) {
525           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
526                       << "context '" << context << "'";
527         }
528       }),
529       timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
530       fibersPoolResizer_(*this),
531       localType_(typeid(LocalT)) {
532   loopController_->setFiberManager(this);
533 }
534
535 template <typename F>
536 typename FirstArgOf<F>::type::value_type inline await(F&& func) {
537   typedef typename FirstArgOf<F>::type::value_type Result;
538   typedef typename FirstArgOf<F>::type::baton_type BatonT;
539
540   return Promise<Result, BatonT>::await(std::forward<F>(func));
541 }
542 }
543 }