make extent_hooks static.
[folly.git] / folly / fibers / FiberManagerInternal-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/fibers/Baton.h>
29 #include <folly/fibers/Fiber.h>
30 #include <folly/fibers/LoopController.h>
31 #include <folly/fibers/Promise.h>
32 #include <folly/Try.h>
33
34 namespace folly {
35 namespace fibers {
36
37 namespace {
38
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #if defined(FOLLY_SANITIZE_ADDRESS) || defined(UNDEFINED_SANITIZER)
41   /* ASAN/UBSAN needs a lot of extra stack space.
42      16x is a conservative estimate, 8x also worked with tests
43      where it mattered.  Note that overallocating here does not necessarily
44      increase RSS, since unused memory is pretty much free. */
45   opts.stackSize *= 16;
46 #endif
47   return opts;
48 }
49
50 } // anonymous
51
52 inline void FiberManager::ensureLoopScheduled() {
53   if (isLoopScheduled_) {
54     return;
55   }
56
57   isLoopScheduled_ = true;
58   loopController_->schedule();
59 }
60
61 inline void FiberManager::activateFiber(Fiber* fiber) {
62   DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
63
64 #ifdef FOLLY_SANITIZE_ADDRESS
65   DCHECK(!fiber->asanMainStackBase_);
66   DCHECK(!fiber->asanMainStackSize_);
67   auto stack = fiber->getStack();
68   void* asanFakeStack;
69   registerStartSwitchStackWithAsan(&asanFakeStack, stack.first, stack.second);
70   SCOPE_EXIT {
71     registerFinishSwitchStackWithAsan(asanFakeStack, nullptr, nullptr);
72     fiber->asanMainStackBase_ = nullptr;
73     fiber->asanMainStackSize_ = 0;
74   };
75 #endif
76
77   activeFiber_ = fiber;
78   fiber->fiberImpl_.activate();
79 }
80
81 inline void FiberManager::deactivateFiber(Fiber* fiber) {
82   DCHECK_EQ(activeFiber_, fiber);
83
84 #ifdef FOLLY_SANITIZE_ADDRESS
85   DCHECK(fiber->asanMainStackBase_);
86   DCHECK(fiber->asanMainStackSize_);
87
88   registerStartSwitchStackWithAsan(
89       &fiber->asanFakeStack_,
90       fiber->asanMainStackBase_,
91       fiber->asanMainStackSize_);
92   SCOPE_EXIT {
93     registerFinishSwitchStackWithAsan(
94         fiber->asanFakeStack_,
95         &fiber->asanMainStackBase_,
96         &fiber->asanMainStackSize_);
97     fiber->asanFakeStack_ = nullptr;
98   };
99 #endif
100
101   activeFiber_ = nullptr;
102   fiber->fiberImpl_.deactivate();
103 }
104
105 inline void FiberManager::runReadyFiber(Fiber* fiber) {
106   SCOPE_EXIT {
107     assert(currentFiber_ == nullptr);
108     assert(activeFiber_ == nullptr);
109   };
110
111   assert(
112       fiber->state_ == Fiber::NOT_STARTED ||
113       fiber->state_ == Fiber::READY_TO_RUN);
114   currentFiber_ = fiber;
115   fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
116   if (observer_) {
117     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
118   }
119
120   while (fiber->state_ == Fiber::NOT_STARTED ||
121          fiber->state_ == Fiber::READY_TO_RUN) {
122     activateFiber(fiber);
123     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
124       try {
125         immediateFunc_();
126       } catch (...) {
127         exceptionCallback_(std::current_exception(), "running immediateFunc_");
128       }
129       immediateFunc_ = nullptr;
130       fiber->state_ = Fiber::READY_TO_RUN;
131     }
132   }
133
134   if (fiber->state_ == Fiber::AWAITING) {
135     awaitFunc_(*fiber);
136     awaitFunc_ = nullptr;
137     if (observer_) {
138       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
139     }
140     currentFiber_ = nullptr;
141     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
142   } else if (fiber->state_ == Fiber::INVALID) {
143     assert(fibersActive_ > 0);
144     --fibersActive_;
145     // Making sure that task functor is deleted once task is complete.
146     // NOTE: we must do it on main context, as the fiber is not
147     // running at this point.
148     fiber->func_ = nullptr;
149     fiber->resultFunc_ = nullptr;
150     if (fiber->finallyFunc_) {
151       try {
152         fiber->finallyFunc_();
153       } catch (...) {
154         exceptionCallback_(std::current_exception(), "running finallyFunc_");
155       }
156       fiber->finallyFunc_ = nullptr;
157     }
158     // Make sure LocalData is not accessible from its destructor
159     if (observer_) {
160       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
161     }
162     currentFiber_ = nullptr;
163     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
164     fiber->localData_.reset();
165     fiber->rcontext_.reset();
166
167     if (fibersPoolSize_ < options_.maxFibersPoolSize ||
168         options_.fibersPoolResizePeriodMs > 0) {
169       fibersPool_.push_front(*fiber);
170       ++fibersPoolSize_;
171     } else {
172       delete fiber;
173       assert(fibersAllocated_ > 0);
174       --fibersAllocated_;
175     }
176   } else if (fiber->state_ == Fiber::YIELDED) {
177     if (observer_) {
178       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
179     }
180     currentFiber_ = nullptr;
181     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
182     fiber->state_ = Fiber::READY_TO_RUN;
183     yieldedFibers_.push_back(*fiber);
184   }
185 }
186
187 inline void FiberManager::loopUntilNoReady() {
188   return loopController_->runLoop();
189 }
190
191 inline void FiberManager::loopUntilNoReadyImpl() {
192 #ifndef _WIN32
193   if (UNLIKELY(!alternateSignalStackRegistered_)) {
194     registerAlternateSignalStack();
195   }
196 #endif
197
198   // Support nested FiberManagers
199   auto originalFiberManager = this;
200   std::swap(currentFiberManager_, originalFiberManager);
201
202   SCOPE_EXIT {
203     isLoopScheduled_ = false;
204     if (!readyFibers_.empty()) {
205       ensureLoopScheduled();
206     }
207     std::swap(currentFiberManager_, originalFiberManager);
208     CHECK_EQ(this, originalFiberManager);
209   };
210
211   bool hadRemoteFiber = true;
212   while (hadRemoteFiber) {
213     hadRemoteFiber = false;
214
215     while (!readyFibers_.empty()) {
216       auto& fiber = readyFibers_.front();
217       readyFibers_.pop_front();
218       runReadyFiber(&fiber);
219     }
220
221     remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
222       runReadyFiber(fiber);
223       hadRemoteFiber = true;
224     });
225
226     remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
227       std::unique_ptr<RemoteTask> task(taskPtr);
228       auto fiber = getFiber();
229       if (task->localData) {
230         fiber->localData_ = *task->localData;
231       }
232       fiber->rcontext_ = std::move(task->rcontext);
233
234       fiber->setFunction(std::move(task->func));
235       if (observer_) {
236         observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
237       }
238       runReadyFiber(fiber);
239       hadRemoteFiber = true;
240     });
241   }
242
243   if (observer_) {
244     for (auto& yielded : yieldedFibers_) {
245       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
246     }
247   }
248   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
249 }
250
251 // We need this to be in a struct, not inlined in addTask, because clang crashes
252 // otherwise.
253 template <typename F>
254 struct FiberManager::AddTaskHelper {
255   class Func;
256
257   static constexpr bool allocateInBuffer =
258       sizeof(Func) <= Fiber::kUserBufferSize;
259
260   class Func {
261    public:
262     Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
263
264     void operator()() {
265       try {
266         func_();
267       } catch (...) {
268         fm_.exceptionCallback_(
269             std::current_exception(), "running Func functor");
270       }
271       if (allocateInBuffer) {
272         this->~Func();
273       } else {
274         delete this;
275       }
276     }
277
278    private:
279     F func_;
280     FiberManager& fm_;
281   };
282 };
283
284 template <typename F>
285 void FiberManager::addTask(F&& func) {
286   typedef AddTaskHelper<F> Helper;
287
288   auto fiber = getFiber();
289   initLocalData(*fiber);
290
291   if (Helper::allocateInBuffer) {
292     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
293     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
294
295     fiber->setFunction(std::ref(*funcLoc));
296   } else {
297     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
298
299     fiber->setFunction(std::ref(*funcLoc));
300   }
301
302   readyFibers_.push_back(*fiber);
303   if (observer_) {
304     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
305   }
306
307   ensureLoopScheduled();
308 }
309
310 template <typename F>
311 void FiberManager::addTaskRemote(F&& func) {
312   auto task = [&]() {
313     auto currentFm = getFiberManagerUnsafe();
314     if (currentFm && currentFm->currentFiber_ &&
315         currentFm->localType_ == localType_) {
316       return std::make_unique<RemoteTask>(
317           std::forward<F>(func), currentFm->currentFiber_->localData_);
318     }
319     return std::make_unique<RemoteTask>(std::forward<F>(func));
320   }();
321   auto insertHead = [&]() {
322     return remoteTaskQueue_.insertHead(task.release());
323   };
324   loopController_->scheduleThreadSafe(std::ref(insertHead));
325 }
326
327 template <typename X>
328 struct IsRvalueRefTry {
329   static const bool value = false;
330 };
331 template <typename T>
332 struct IsRvalueRefTry<folly::Try<T>&&> {
333   static const bool value = true;
334 };
335
336 // We need this to be in a struct, not inlined in addTaskFinally, because clang
337 // crashes otherwise.
338 template <typename F, typename G>
339 struct FiberManager::AddTaskFinallyHelper {
340   class Func;
341
342   typedef typename std::result_of<F()>::type Result;
343
344   class Finally {
345    public:
346     Finally(G finally, FiberManager& fm)
347         : finally_(std::move(finally)), fm_(fm) {}
348
349     void operator()() {
350       try {
351         finally_(std::move(*result_));
352       } catch (...) {
353         fm_.exceptionCallback_(
354             std::current_exception(), "running Finally functor");
355       }
356
357       if (allocateInBuffer) {
358         this->~Finally();
359       } else {
360         delete this;
361       }
362     }
363
364    private:
365     friend class Func;
366
367     G finally_;
368     folly::Optional<folly::Try<Result>> result_;
369     FiberManager& fm_;
370   };
371
372   class Func {
373    public:
374     Func(F func, Finally& finally)
375         : func_(std::move(func)), result_(finally.result_) {}
376
377     void operator()() {
378       result_ = folly::makeTryWith(std::move(func_));
379
380       if (allocateInBuffer) {
381         this->~Func();
382       } else {
383         delete this;
384       }
385     }
386
387    private:
388     F func_;
389     folly::Optional<folly::Try<Result>>& result_;
390   };
391
392   static constexpr bool allocateInBuffer =
393       sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
394 };
395
396 template <typename F, typename G>
397 void FiberManager::addTaskFinally(F&& func, G&& finally) {
398   typedef typename std::result_of<F()>::type Result;
399
400   static_assert(
401       IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
402       "finally(arg): arg must be Try<T>&&");
403   static_assert(
404       std::is_convertible<
405           Result,
406           typename std::remove_reference<
407               typename FirstArgOf<G>::type>::type::element_type>::value,
408       "finally(Try<T>&&): T must be convertible from func()'s return type");
409
410   auto fiber = getFiber();
411   initLocalData(*fiber);
412
413   typedef AddTaskFinallyHelper<
414       typename std::decay<F>::type,
415       typename std::decay<G>::type>
416       Helper;
417
418   if (Helper::allocateInBuffer) {
419     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
420     auto finallyLoc =
421         static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
422
423     new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
424     new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
425
426     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
427   } else {
428     auto finallyLoc =
429         new typename Helper::Finally(std::forward<G>(finally), *this);
430     auto funcLoc =
431         new typename Helper::Func(std::forward<F>(func), *finallyLoc);
432
433     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
434   }
435
436   readyFibers_.push_back(*fiber);
437   if (observer_) {
438     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
439   }
440
441   ensureLoopScheduled();
442 }
443
444 template <typename F>
445 typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
446   if (UNLIKELY(activeFiber_ == nullptr)) {
447     return func();
448   }
449
450   typedef typename std::result_of<F()>::type Result;
451
452   folly::Try<Result> result;
453   auto f = [&func, &result]() mutable {
454     result = folly::makeTryWith(std::forward<F>(func));
455   };
456
457   immediateFunc_ = std::ref(f);
458   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
459
460   return std::move(result).value();
461 }
462
463 inline FiberManager& FiberManager::getFiberManager() {
464   assert(currentFiberManager_ != nullptr);
465   return *currentFiberManager_;
466 }
467
468 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
469   return currentFiberManager_;
470 }
471
472 inline bool FiberManager::hasActiveFiber() const {
473   return activeFiber_ != nullptr;
474 }
475
476 inline void FiberManager::yield() {
477   assert(currentFiberManager_ == this);
478   assert(activeFiber_ != nullptr);
479   assert(activeFiber_->state_ == Fiber::RUNNING);
480   activeFiber_->preempt(Fiber::YIELDED);
481 }
482
483 template <typename T>
484 T& FiberManager::local() {
485   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
486     return currentFiber_->localData_.get<T>();
487   }
488   return localThread<T>();
489 }
490
491 template <typename T>
492 T& FiberManager::localThread() {
493 #ifndef __APPLE__
494   static thread_local T t;
495   return t;
496 #else // osx doesn't support thread_local
497   static ThreadLocal<T> t;
498   return *t;
499 #endif
500 }
501
502 inline void FiberManager::initLocalData(Fiber& fiber) {
503   auto fm = getFiberManagerUnsafe();
504   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
505     fiber.localData_ = fm->currentFiber_->localData_;
506   }
507   fiber.rcontext_ = RequestContext::saveContext();
508 }
509
510 template <typename LocalT>
511 FiberManager::FiberManager(
512     LocalType<LocalT>,
513     std::unique_ptr<LoopController> loopController__,
514     Options options)
515     : loopController_(std::move(loopController__)),
516       stackAllocator_(options.useGuardPages),
517       options_(preprocessOptions(std::move(options))),
518       exceptionCallback_([](std::exception_ptr eptr, std::string context) {
519         try {
520           std::rethrow_exception(eptr);
521         } catch (const std::exception& e) {
522           LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
523                       << e.what() << "' was thrown in "
524                       << "FiberManager with context '" << context << "'";
525         } catch (...) {
526           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
527                       << "context '" << context << "'";
528         }
529       }),
530       timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
531       fibersPoolResizer_(*this),
532       localType_(typeid(LocalT)) {
533   loopController_->setFiberManager(this);
534 }
535
536 template <typename F>
537 typename FirstArgOf<F>::type::value_type inline await(F&& func) {
538   typedef typename FirstArgOf<F>::type::value_type Result;
539   typedef typename FirstArgOf<F>::type::baton_type BatonT;
540
541   return Promise<Result, BatonT>::await(std::forward<F>(func));
542 }
543 }
544 }