Make RequestContext provider overridable in order to save cost of setContext() on...
[folly.git] / folly / fibers / FiberManagerInternal-inl.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/Try.h>
29 #include <folly/fibers/Baton.h>
30 #include <folly/fibers/Fiber.h>
31 #include <folly/fibers/LoopController.h>
32 #include <folly/fibers/Promise.h>
33
34 namespace folly {
35 namespace fibers {
36
37 namespace {
38
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #if defined(FOLLY_SANITIZE_ADDRESS) || defined(UNDEFINED_SANITIZER) || \
41     defined(FOLLY_SANITIZE_THREAD)
42   /* Sanitizers need a lot of extra stack space.
43      16x is a conservative estimate, 8x also worked with tests
44      where it mattered.  Note that overallocating here does not necessarily
45      increase RSS, since unused memory is pretty much free. */
46   opts.stackSize *= 16;
47 #endif
48   return opts;
49 }
50
51 } // namespace
52
53 inline void FiberManager::ensureLoopScheduled() {
54   if (isLoopScheduled_) {
55     return;
56   }
57
58   isLoopScheduled_ = true;
59   loopController_->schedule();
60 }
61
62 inline void FiberManager::activateFiber(Fiber* fiber) {
63   DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
64
65 #ifdef FOLLY_SANITIZE_ADDRESS
66   DCHECK(!fiber->asanMainStackBase_);
67   DCHECK(!fiber->asanMainStackSize_);
68   auto stack = fiber->getStack();
69   void* asanFakeStack;
70   registerStartSwitchStackWithAsan(&asanFakeStack, stack.first, stack.second);
71   SCOPE_EXIT {
72     registerFinishSwitchStackWithAsan(asanFakeStack, nullptr, nullptr);
73     fiber->asanMainStackBase_ = nullptr;
74     fiber->asanMainStackSize_ = 0;
75   };
76 #endif
77
78   activeFiber_ = fiber;
79   fiber->fiberImpl_.activate();
80 }
81
82 inline void FiberManager::deactivateFiber(Fiber* fiber) {
83   DCHECK_EQ(activeFiber_, fiber);
84
85 #ifdef FOLLY_SANITIZE_ADDRESS
86   DCHECK(fiber->asanMainStackBase_);
87   DCHECK(fiber->asanMainStackSize_);
88
89   registerStartSwitchStackWithAsan(
90       &fiber->asanFakeStack_,
91       fiber->asanMainStackBase_,
92       fiber->asanMainStackSize_);
93   SCOPE_EXIT {
94     registerFinishSwitchStackWithAsan(
95         fiber->asanFakeStack_,
96         &fiber->asanMainStackBase_,
97         &fiber->asanMainStackSize_);
98     fiber->asanFakeStack_ = nullptr;
99   };
100 #endif
101
102   activeFiber_ = nullptr;
103   fiber->fiberImpl_.deactivate();
104 }
105
106 inline void FiberManager::runReadyFiber(Fiber* fiber) {
107   SCOPE_EXIT {
108     assert(currentFiber_ == nullptr);
109     assert(activeFiber_ == nullptr);
110   };
111
112   assert(
113       fiber->state_ == Fiber::NOT_STARTED ||
114       fiber->state_ == Fiber::READY_TO_RUN);
115   currentFiber_ = fiber;
116   if (observer_) {
117     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
118   }
119
120   while (fiber->state_ == Fiber::NOT_STARTED ||
121          fiber->state_ == Fiber::READY_TO_RUN) {
122     activateFiber(fiber);
123     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
124       try {
125         immediateFunc_();
126       } catch (...) {
127         exceptionCallback_(std::current_exception(), "running immediateFunc_");
128       }
129       immediateFunc_ = nullptr;
130       fiber->state_ = Fiber::READY_TO_RUN;
131     }
132   }
133
134   if (fiber->state_ == Fiber::AWAITING) {
135     awaitFunc_(*fiber);
136     awaitFunc_ = nullptr;
137     if (observer_) {
138       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
139     }
140     currentFiber_ = nullptr;
141   } else if (fiber->state_ == Fiber::INVALID) {
142     assert(fibersActive_ > 0);
143     --fibersActive_;
144     // Making sure that task functor is deleted once task is complete.
145     // NOTE: we must do it on main context, as the fiber is not
146     // running at this point.
147     fiber->func_ = nullptr;
148     fiber->resultFunc_ = nullptr;
149     if (fiber->finallyFunc_) {
150       try {
151         fiber->finallyFunc_();
152       } catch (...) {
153         exceptionCallback_(std::current_exception(), "running finallyFunc_");
154       }
155       fiber->finallyFunc_ = nullptr;
156     }
157     // Make sure LocalData is not accessible from its destructor
158     if (observer_) {
159       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
160     }
161     currentFiber_ = nullptr;
162     fiber->localData_.reset();
163     fiber->rcontext_.reset();
164
165     if (fibersPoolSize_ < options_.maxFibersPoolSize ||
166         options_.fibersPoolResizePeriodMs > 0) {
167       fibersPool_.push_front(*fiber);
168       ++fibersPoolSize_;
169     } else {
170       delete fiber;
171       assert(fibersAllocated_ > 0);
172       --fibersAllocated_;
173     }
174   } else if (fiber->state_ == Fiber::YIELDED) {
175     if (observer_) {
176       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
177     }
178     currentFiber_ = nullptr;
179     fiber->state_ = Fiber::READY_TO_RUN;
180     yieldedFibers_.push_back(*fiber);
181   }
182 }
183
184 inline void FiberManager::loopUntilNoReady() {
185   return loopController_->runLoop();
186 }
187
188 inline void FiberManager::loopUntilNoReadyImpl() {
189 #ifndef _WIN32
190   if (UNLIKELY(!alternateSignalStackRegistered_)) {
191     registerAlternateSignalStack();
192   }
193 #endif
194
195   // Support nested FiberManagers
196   auto originalFiberManager = this;
197   std::swap(currentFiberManager_, originalFiberManager);
198
199   RequestContext::Provider oldRequestContextProvider;
200   auto newRequestContextProvider =
201       [this, &oldRequestContextProvider]() -> std::shared_ptr<RequestContext>& {
202     return currentFiber_ ? currentFiber_->rcontext_
203                          : oldRequestContextProvider();
204   };
205   oldRequestContextProvider = RequestContext::setRequestContextProvider(
206       std::ref(newRequestContextProvider));
207
208   SCOPE_EXIT {
209     isLoopScheduled_ = false;
210     // Restore RequestContext provider before call to ensureLoopScheduled()
211     RequestContext::setRequestContextProvider(
212         std::move(oldRequestContextProvider));
213     if (!readyFibers_.empty()) {
214       ensureLoopScheduled();
215     }
216     std::swap(currentFiberManager_, originalFiberManager);
217     CHECK_EQ(this, originalFiberManager);
218   };
219
220   bool hadRemoteFiber = true;
221   while (hadRemoteFiber) {
222     hadRemoteFiber = false;
223
224     while (!readyFibers_.empty()) {
225       auto& fiber = readyFibers_.front();
226       readyFibers_.pop_front();
227       runReadyFiber(&fiber);
228     }
229
230     remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
231       runReadyFiber(fiber);
232       hadRemoteFiber = true;
233     });
234
235     remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
236       std::unique_ptr<RemoteTask> task(taskPtr);
237       auto fiber = getFiber();
238       if (task->localData) {
239         fiber->localData_ = *task->localData;
240       }
241       fiber->rcontext_ = std::move(task->rcontext);
242
243       fiber->setFunction(std::move(task->func));
244       if (observer_) {
245         observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
246       }
247       runReadyFiber(fiber);
248       hadRemoteFiber = true;
249     });
250   }
251
252   if (observer_) {
253     for (auto& yielded : yieldedFibers_) {
254       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
255     }
256   }
257   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
258 }
259
260 // We need this to be in a struct, not inlined in addTask, because clang crashes
261 // otherwise.
262 template <typename F>
263 struct FiberManager::AddTaskHelper {
264   class Func;
265
266   static constexpr bool allocateInBuffer =
267       sizeof(Func) <= Fiber::kUserBufferSize;
268
269   class Func {
270    public:
271     Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
272
273     void operator()() {
274       try {
275         func_();
276       } catch (...) {
277         fm_.exceptionCallback_(
278             std::current_exception(), "running Func functor");
279       }
280       if (allocateInBuffer) {
281         this->~Func();
282       } else {
283         delete this;
284       }
285     }
286
287    private:
288     F func_;
289     FiberManager& fm_;
290   };
291 };
292
293 template <typename F>
294 void FiberManager::addTask(F&& func) {
295   typedef AddTaskHelper<F> Helper;
296
297   auto fiber = getFiber();
298   initLocalData(*fiber);
299
300   if (Helper::allocateInBuffer) {
301     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
302     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
303
304     fiber->setFunction(std::ref(*funcLoc));
305   } else {
306     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
307
308     fiber->setFunction(std::ref(*funcLoc));
309   }
310
311   readyFibers_.push_back(*fiber);
312   if (observer_) {
313     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
314   }
315
316   ensureLoopScheduled();
317 }
318
319 template <typename F>
320 void FiberManager::addTaskRemote(F&& func) {
321   auto task = [&]() {
322     auto currentFm = getFiberManagerUnsafe();
323     if (currentFm && currentFm->currentFiber_ &&
324         currentFm->localType_ == localType_) {
325       return std::make_unique<RemoteTask>(
326           std::forward<F>(func), currentFm->currentFiber_->localData_);
327     }
328     return std::make_unique<RemoteTask>(std::forward<F>(func));
329   }();
330   auto insertHead = [&]() {
331     return remoteTaskQueue_.insertHead(task.release());
332   };
333   loopController_->scheduleThreadSafe(std::ref(insertHead));
334 }
335
336 template <typename X>
337 struct IsRvalueRefTry {
338   static const bool value = false;
339 };
340 template <typename T>
341 struct IsRvalueRefTry<folly::Try<T>&&> {
342   static const bool value = true;
343 };
344
345 // We need this to be in a struct, not inlined in addTaskFinally, because clang
346 // crashes otherwise.
347 template <typename F, typename G>
348 struct FiberManager::AddTaskFinallyHelper {
349   class Func;
350
351   typedef typename std::result_of<F()>::type Result;
352
353   class Finally {
354    public:
355     Finally(G finally, FiberManager& fm)
356         : finally_(std::move(finally)), fm_(fm) {}
357
358     void operator()() {
359       try {
360         finally_(std::move(*result_));
361       } catch (...) {
362         fm_.exceptionCallback_(
363             std::current_exception(), "running Finally functor");
364       }
365
366       if (allocateInBuffer) {
367         this->~Finally();
368       } else {
369         delete this;
370       }
371     }
372
373    private:
374     friend class Func;
375
376     G finally_;
377     folly::Optional<folly::Try<Result>> result_;
378     FiberManager& fm_;
379   };
380
381   class Func {
382    public:
383     Func(F func, Finally& finally)
384         : func_(std::move(func)), result_(finally.result_) {}
385
386     void operator()() {
387       result_ = folly::makeTryWith(std::move(func_));
388
389       if (allocateInBuffer) {
390         this->~Func();
391       } else {
392         delete this;
393       }
394     }
395
396    private:
397     F func_;
398     folly::Optional<folly::Try<Result>>& result_;
399   };
400
401   static constexpr bool allocateInBuffer =
402       sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
403 };
404
405 template <typename F, typename G>
406 void FiberManager::addTaskFinally(F&& func, G&& finally) {
407   typedef typename std::result_of<F()>::type Result;
408
409   static_assert(
410       IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
411       "finally(arg): arg must be Try<T>&&");
412   static_assert(
413       std::is_convertible<
414           Result,
415           typename std::remove_reference<
416               typename FirstArgOf<G>::type>::type::element_type>::value,
417       "finally(Try<T>&&): T must be convertible from func()'s return type");
418
419   auto fiber = getFiber();
420   initLocalData(*fiber);
421
422   typedef AddTaskFinallyHelper<
423       typename std::decay<F>::type,
424       typename std::decay<G>::type>
425       Helper;
426
427   if (Helper::allocateInBuffer) {
428     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
429     auto finallyLoc =
430         static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
431
432     new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
433     new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
434
435     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
436   } else {
437     auto finallyLoc =
438         new typename Helper::Finally(std::forward<G>(finally), *this);
439     auto funcLoc =
440         new typename Helper::Func(std::forward<F>(func), *finallyLoc);
441
442     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
443   }
444
445   readyFibers_.push_back(*fiber);
446   if (observer_) {
447     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
448   }
449
450   ensureLoopScheduled();
451 }
452
453 template <typename F>
454 typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
455   if (UNLIKELY(activeFiber_ == nullptr)) {
456     return func();
457   }
458
459   typedef typename std::result_of<F()>::type Result;
460
461   folly::Try<Result> result;
462   auto f = [&func, &result]() mutable {
463     result = folly::makeTryWith(std::forward<F>(func));
464   };
465
466   immediateFunc_ = std::ref(f);
467   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
468
469   return std::move(result).value();
470 }
471
472 inline FiberManager& FiberManager::getFiberManager() {
473   assert(currentFiberManager_ != nullptr);
474   return *currentFiberManager_;
475 }
476
477 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
478   return currentFiberManager_;
479 }
480
481 inline bool FiberManager::hasActiveFiber() const {
482   return activeFiber_ != nullptr;
483 }
484
485 inline void FiberManager::yield() {
486   assert(currentFiberManager_ == this);
487   assert(activeFiber_ != nullptr);
488   assert(activeFiber_->state_ == Fiber::RUNNING);
489   activeFiber_->preempt(Fiber::YIELDED);
490 }
491
492 template <typename T>
493 T& FiberManager::local() {
494   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
495     return currentFiber_->localData_.get<T>();
496   }
497   return localThread<T>();
498 }
499
500 template <typename T>
501 T& FiberManager::localThread() {
502 #ifndef __APPLE__
503   static thread_local T t;
504   return t;
505 #else // osx doesn't support thread_local
506   static ThreadLocal<T> t;
507   return *t;
508 #endif
509 }
510
511 inline void FiberManager::initLocalData(Fiber& fiber) {
512   auto fm = getFiberManagerUnsafe();
513   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
514     fiber.localData_ = fm->currentFiber_->localData_;
515   }
516   fiber.rcontext_ = RequestContext::saveContext();
517 }
518
519 template <typename LocalT>
520 FiberManager::FiberManager(
521     LocalType<LocalT>,
522     std::unique_ptr<LoopController> loopController__,
523     Options options)
524     : loopController_(std::move(loopController__)),
525       stackAllocator_(options.useGuardPages),
526       options_(preprocessOptions(std::move(options))),
527       exceptionCallback_([](std::exception_ptr eptr, std::string context) {
528         try {
529           std::rethrow_exception(eptr);
530         } catch (const std::exception& e) {
531           LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
532                       << e.what() << "' was thrown in "
533                       << "FiberManager with context '" << context << "'";
534         } catch (...) {
535           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
536                       << "context '" << context << "'";
537         }
538       }),
539       timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
540       fibersPoolResizer_(*this),
541       localType_(typeid(LocalT)) {
542   loopController_->setFiberManager(this);
543 }
544
545 template <typename F>
546 typename FirstArgOf<F>::type::value_type inline await(F&& func) {
547   typedef typename FirstArgOf<F>::type::value_type Result;
548   typedef typename FirstArgOf<F>::type::baton_type BatonT;
549
550   return Promise<Result, BatonT>::await(std::forward<F>(func));
551 }
552 }
553 }