2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
26 #include <folly/ThreadLocal.h>
28 #include <folly/fibers/Baton.h>
29 #include <folly/fibers/Fiber.h>
30 #include <folly/fibers/LoopController.h>
31 #include <folly/fibers/Promise.h>
32 #include <folly/Try.h>
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #if defined(FOLLY_SANITIZE_ADDRESS) || defined(UNDEFINED_SANITIZER)
41 /* ASAN/UBSAN needs a lot of extra stack space.
42 16x is a conservative estimate, 8x also worked with tests
43 where it mattered. Note that overallocating here does not necessarily
44 increase RSS, since unused memory is pretty much free. */
52 inline void FiberManager::ensureLoopScheduled() {
53 if (isLoopScheduled_) {
57 isLoopScheduled_ = true;
58 loopController_->schedule();
61 inline void FiberManager::activateFiber(Fiber* fiber) {
62 DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
64 #ifdef FOLLY_SANITIZE_ADDRESS
65 DCHECK(!fiber->asanMainStackBase_);
66 DCHECK(!fiber->asanMainStackSize_);
67 auto stack = fiber->getStack();
69 registerStartSwitchStackWithAsan(&asanFakeStack, stack.first, stack.second);
71 registerFinishSwitchStackWithAsan(asanFakeStack, nullptr, nullptr);
72 fiber->asanMainStackBase_ = nullptr;
73 fiber->asanMainStackSize_ = 0;
78 fiber->fiberImpl_.activate();
81 inline void FiberManager::deactivateFiber(Fiber* fiber) {
82 DCHECK_EQ(activeFiber_, fiber);
84 #ifdef FOLLY_SANITIZE_ADDRESS
85 DCHECK(fiber->asanMainStackBase_);
86 DCHECK(fiber->asanMainStackSize_);
88 registerStartSwitchStackWithAsan(
89 &fiber->asanFakeStack_,
90 fiber->asanMainStackBase_,
91 fiber->asanMainStackSize_);
93 registerFinishSwitchStackWithAsan(
94 fiber->asanFakeStack_,
95 &fiber->asanMainStackBase_,
96 &fiber->asanMainStackSize_);
97 fiber->asanFakeStack_ = nullptr;
101 activeFiber_ = nullptr;
102 fiber->fiberImpl_.deactivate();
105 inline void FiberManager::runReadyFiber(Fiber* fiber) {
107 assert(currentFiber_ == nullptr);
108 assert(activeFiber_ == nullptr);
112 fiber->state_ == Fiber::NOT_STARTED ||
113 fiber->state_ == Fiber::READY_TO_RUN);
114 currentFiber_ = fiber;
115 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
117 observer_->starting(reinterpret_cast<uintptr_t>(fiber));
120 while (fiber->state_ == Fiber::NOT_STARTED ||
121 fiber->state_ == Fiber::READY_TO_RUN) {
122 activateFiber(fiber);
123 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
127 exceptionCallback_(std::current_exception(), "running immediateFunc_");
129 immediateFunc_ = nullptr;
130 fiber->state_ = Fiber::READY_TO_RUN;
134 if (fiber->state_ == Fiber::AWAITING) {
136 awaitFunc_ = nullptr;
138 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
140 currentFiber_ = nullptr;
141 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
142 } else if (fiber->state_ == Fiber::INVALID) {
143 assert(fibersActive_ > 0);
145 // Making sure that task functor is deleted once task is complete.
146 // NOTE: we must do it on main context, as the fiber is not
147 // running at this point.
148 fiber->func_ = nullptr;
149 fiber->resultFunc_ = nullptr;
150 if (fiber->finallyFunc_) {
152 fiber->finallyFunc_();
154 exceptionCallback_(std::current_exception(), "running finallyFunc_");
156 fiber->finallyFunc_ = nullptr;
158 // Make sure LocalData is not accessible from its destructor
160 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
162 currentFiber_ = nullptr;
163 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
164 fiber->localData_.reset();
165 fiber->rcontext_.reset();
167 if (fibersPoolSize_ < options_.maxFibersPoolSize ||
168 options_.fibersPoolResizePeriodMs > 0) {
169 fibersPool_.push_front(*fiber);
173 assert(fibersAllocated_ > 0);
176 } else if (fiber->state_ == Fiber::YIELDED) {
178 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
180 currentFiber_ = nullptr;
181 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
182 fiber->state_ = Fiber::READY_TO_RUN;
183 yieldedFibers_.push_back(*fiber);
187 inline void FiberManager::loopUntilNoReady() {
188 return loopController_->runLoop();
191 inline void FiberManager::loopUntilNoReadyImpl() {
193 if (UNLIKELY(!alternateSignalStackRegistered_)) {
194 registerAlternateSignalStack();
198 // Support nested FiberManagers
199 auto originalFiberManager = this;
200 std::swap(currentFiberManager_, originalFiberManager);
203 isLoopScheduled_ = false;
204 if (!readyFibers_.empty()) {
205 ensureLoopScheduled();
207 std::swap(currentFiberManager_, originalFiberManager);
208 CHECK_EQ(this, originalFiberManager);
211 bool hadRemoteFiber = true;
212 while (hadRemoteFiber) {
213 hadRemoteFiber = false;
215 while (!readyFibers_.empty()) {
216 auto& fiber = readyFibers_.front();
217 readyFibers_.pop_front();
218 runReadyFiber(&fiber);
221 remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
222 runReadyFiber(fiber);
223 hadRemoteFiber = true;
226 remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
227 std::unique_ptr<RemoteTask> task(taskPtr);
228 auto fiber = getFiber();
229 if (task->localData) {
230 fiber->localData_ = *task->localData;
232 fiber->rcontext_ = std::move(task->rcontext);
234 fiber->setFunction(std::move(task->func));
236 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
238 runReadyFiber(fiber);
239 hadRemoteFiber = true;
244 for (auto& yielded : yieldedFibers_) {
245 observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
248 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
251 // We need this to be in a struct, not inlined in addTask, because clang crashes
253 template <typename F>
254 struct FiberManager::AddTaskHelper {
257 static constexpr bool allocateInBuffer =
258 sizeof(Func) <= Fiber::kUserBufferSize;
262 Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
268 fm_.exceptionCallback_(
269 std::current_exception(), "running Func functor");
271 if (allocateInBuffer) {
284 template <typename F>
285 void FiberManager::addTask(F&& func) {
286 typedef AddTaskHelper<F> Helper;
288 auto fiber = getFiber();
289 initLocalData(*fiber);
291 if (Helper::allocateInBuffer) {
292 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
293 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
295 fiber->setFunction(std::ref(*funcLoc));
297 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
299 fiber->setFunction(std::ref(*funcLoc));
302 readyFibers_.push_back(*fiber);
304 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
307 ensureLoopScheduled();
310 template <typename F>
311 void FiberManager::addTaskRemote(F&& func) {
313 auto currentFm = getFiberManagerUnsafe();
314 if (currentFm && currentFm->currentFiber_ &&
315 currentFm->localType_ == localType_) {
316 return folly::make_unique<RemoteTask>(
317 std::forward<F>(func), currentFm->currentFiber_->localData_);
319 return folly::make_unique<RemoteTask>(std::forward<F>(func));
321 auto insertHead = [&]() {
322 return remoteTaskQueue_.insertHead(task.release());
324 loopController_->scheduleThreadSafe(std::ref(insertHead));
327 template <typename X>
328 struct IsRvalueRefTry {
329 static const bool value = false;
331 template <typename T>
332 struct IsRvalueRefTry<folly::Try<T>&&> {
333 static const bool value = true;
336 // We need this to be in a struct, not inlined in addTaskFinally, because clang
337 // crashes otherwise.
338 template <typename F, typename G>
339 struct FiberManager::AddTaskFinallyHelper {
342 typedef typename std::result_of<F()>::type Result;
346 Finally(G finally, FiberManager& fm)
347 : finally_(std::move(finally)), fm_(fm) {}
351 finally_(std::move(*result_));
353 fm_.exceptionCallback_(
354 std::current_exception(), "running Finally functor");
357 if (allocateInBuffer) {
368 folly::Optional<folly::Try<Result>> result_;
374 Func(F func, Finally& finally)
375 : func_(std::move(func)), result_(finally.result_) {}
378 result_ = folly::makeTryWith(std::move(func_));
380 if (allocateInBuffer) {
389 folly::Optional<folly::Try<Result>>& result_;
392 static constexpr bool allocateInBuffer =
393 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
396 template <typename F, typename G>
397 void FiberManager::addTaskFinally(F&& func, G&& finally) {
398 typedef typename std::result_of<F()>::type Result;
401 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
402 "finally(arg): arg must be Try<T>&&");
406 typename std::remove_reference<
407 typename FirstArgOf<G>::type>::type::element_type>::value,
408 "finally(Try<T>&&): T must be convertible from func()'s return type");
410 auto fiber = getFiber();
411 initLocalData(*fiber);
413 typedef AddTaskFinallyHelper<
414 typename std::decay<F>::type,
415 typename std::decay<G>::type>
418 if (Helper::allocateInBuffer) {
419 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
421 static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
423 new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
424 new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
426 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
429 new typename Helper::Finally(std::forward<G>(finally), *this);
431 new typename Helper::Func(std::forward<F>(func), *finallyLoc);
433 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
436 readyFibers_.push_back(*fiber);
438 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
441 ensureLoopScheduled();
444 template <typename F>
445 typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
446 if (UNLIKELY(activeFiber_ == nullptr)) {
450 typedef typename std::result_of<F()>::type Result;
452 folly::Try<Result> result;
453 auto f = [&func, &result]() mutable {
454 result = folly::makeTryWith(std::forward<F>(func));
457 immediateFunc_ = std::ref(f);
458 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
460 return std::move(result).value();
463 inline FiberManager& FiberManager::getFiberManager() {
464 assert(currentFiberManager_ != nullptr);
465 return *currentFiberManager_;
468 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
469 return currentFiberManager_;
472 inline bool FiberManager::hasActiveFiber() const {
473 return activeFiber_ != nullptr;
476 inline void FiberManager::yield() {
477 assert(currentFiberManager_ == this);
478 assert(activeFiber_ != nullptr);
479 assert(activeFiber_->state_ == Fiber::RUNNING);
480 activeFiber_->preempt(Fiber::YIELDED);
483 template <typename T>
484 T& FiberManager::local() {
485 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
486 return currentFiber_->localData_.get<T>();
488 return localThread<T>();
491 template <typename T>
492 T& FiberManager::localThread() {
494 static thread_local T t;
496 #else // osx doesn't support thread_local
497 static ThreadLocal<T> t;
502 inline void FiberManager::initLocalData(Fiber& fiber) {
503 auto fm = getFiberManagerUnsafe();
504 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
505 fiber.localData_ = fm->currentFiber_->localData_;
507 fiber.rcontext_ = RequestContext::saveContext();
510 template <typename LocalT>
511 FiberManager::FiberManager(
513 std::unique_ptr<LoopController> loopController__,
515 : loopController_(std::move(loopController__)),
516 stackAllocator_(options.useGuardPages),
517 options_(preprocessOptions(std::move(options))),
518 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
520 std::rethrow_exception(eptr);
521 } catch (const std::exception& e) {
522 LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
523 << e.what() << "' was thrown in "
524 << "FiberManager with context '" << context << "'";
526 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
527 << "context '" << context << "'";
530 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
531 fibersPoolResizer_(*this),
532 localType_(typeid(LocalT)) {
533 loopController_->setFiberManager(this);
536 template <typename F>
537 typename FirstArgOf<F>::type::value_type inline await(F&& func) {
538 typedef typename FirstArgOf<F>::type::value_type Result;
539 typedef typename FirstArgOf<F>::type::baton_type BatonT;
541 return Promise<Result, BatonT>::await(std::forward<F>(func));