2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
26 #include <folly/ThreadLocal.h>
28 #include <folly/Try.h>
29 #include <folly/fibers/Baton.h>
30 #include <folly/fibers/Fiber.h>
31 #include <folly/fibers/LoopController.h>
32 #include <folly/fibers/Promise.h>
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #if defined(FOLLY_SANITIZE_ADDRESS) || defined(UNDEFINED_SANITIZER) || \
41 defined(FOLLY_SANITIZE_THREAD)
42 /* Sanitizers need a lot of extra stack space.
43 16x is a conservative estimate, 8x also worked with tests
44 where it mattered. Note that overallocating here does not necessarily
45 increase RSS, since unused memory is pretty much free. */
53 inline void FiberManager::ensureLoopScheduled() {
54 if (isLoopScheduled_) {
58 isLoopScheduled_ = true;
59 loopController_->schedule();
62 inline void FiberManager::activateFiber(Fiber* fiber) {
63 DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
65 #ifdef FOLLY_SANITIZE_ADDRESS
66 DCHECK(!fiber->asanMainStackBase_);
67 DCHECK(!fiber->asanMainStackSize_);
68 auto stack = fiber->getStack();
70 registerStartSwitchStackWithAsan(&asanFakeStack, stack.first, stack.second);
72 registerFinishSwitchStackWithAsan(asanFakeStack, nullptr, nullptr);
73 fiber->asanMainStackBase_ = nullptr;
74 fiber->asanMainStackSize_ = 0;
79 fiber->fiberImpl_.activate();
82 inline void FiberManager::deactivateFiber(Fiber* fiber) {
83 DCHECK_EQ(activeFiber_, fiber);
85 #ifdef FOLLY_SANITIZE_ADDRESS
86 DCHECK(fiber->asanMainStackBase_);
87 DCHECK(fiber->asanMainStackSize_);
89 registerStartSwitchStackWithAsan(
90 &fiber->asanFakeStack_,
91 fiber->asanMainStackBase_,
92 fiber->asanMainStackSize_);
94 registerFinishSwitchStackWithAsan(
95 fiber->asanFakeStack_,
96 &fiber->asanMainStackBase_,
97 &fiber->asanMainStackSize_);
98 fiber->asanFakeStack_ = nullptr;
102 activeFiber_ = nullptr;
103 fiber->fiberImpl_.deactivate();
106 inline void FiberManager::runReadyFiber(Fiber* fiber) {
108 assert(currentFiber_ == nullptr);
109 assert(activeFiber_ == nullptr);
113 fiber->state_ == Fiber::NOT_STARTED ||
114 fiber->state_ == Fiber::READY_TO_RUN);
115 currentFiber_ = fiber;
117 observer_->starting(reinterpret_cast<uintptr_t>(fiber));
120 while (fiber->state_ == Fiber::NOT_STARTED ||
121 fiber->state_ == Fiber::READY_TO_RUN) {
122 activateFiber(fiber);
123 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
127 exceptionCallback_(std::current_exception(), "running immediateFunc_");
129 immediateFunc_ = nullptr;
130 fiber->state_ = Fiber::READY_TO_RUN;
134 if (fiber->state_ == Fiber::AWAITING) {
136 awaitFunc_ = nullptr;
138 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
140 currentFiber_ = nullptr;
141 } else if (fiber->state_ == Fiber::INVALID) {
142 assert(fibersActive_ > 0);
144 // Making sure that task functor is deleted once task is complete.
145 // NOTE: we must do it on main context, as the fiber is not
146 // running at this point.
147 fiber->func_ = nullptr;
148 fiber->resultFunc_ = nullptr;
149 if (fiber->finallyFunc_) {
151 fiber->finallyFunc_();
153 exceptionCallback_(std::current_exception(), "running finallyFunc_");
155 fiber->finallyFunc_ = nullptr;
157 // Make sure LocalData is not accessible from its destructor
159 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
161 currentFiber_ = nullptr;
162 fiber->localData_.reset();
163 fiber->rcontext_.reset();
165 if (fibersPoolSize_ < options_.maxFibersPoolSize ||
166 options_.fibersPoolResizePeriodMs > 0) {
167 fibersPool_.push_front(*fiber);
171 assert(fibersAllocated_ > 0);
174 } else if (fiber->state_ == Fiber::YIELDED) {
176 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
178 currentFiber_ = nullptr;
179 fiber->state_ = Fiber::READY_TO_RUN;
180 yieldedFibers_.push_back(*fiber);
184 inline void FiberManager::loopUntilNoReady() {
185 return loopController_->runLoop();
188 inline void FiberManager::loopUntilNoReadyImpl() {
190 if (UNLIKELY(!alternateSignalStackRegistered_)) {
191 registerAlternateSignalStack();
195 // Support nested FiberManagers
196 auto originalFiberManager = this;
197 std::swap(currentFiberManager_, originalFiberManager);
199 RequestContext::Provider oldRequestContextProvider;
200 auto newRequestContextProvider =
201 [this, &oldRequestContextProvider]() -> std::shared_ptr<RequestContext>& {
202 return currentFiber_ ? currentFiber_->rcontext_
203 : oldRequestContextProvider();
205 oldRequestContextProvider = RequestContext::setRequestContextProvider(
206 std::ref(newRequestContextProvider));
209 isLoopScheduled_ = false;
210 // Restore RequestContext provider before call to ensureLoopScheduled()
211 RequestContext::setRequestContextProvider(
212 std::move(oldRequestContextProvider));
213 if (!readyFibers_.empty()) {
214 ensureLoopScheduled();
216 std::swap(currentFiberManager_, originalFiberManager);
217 CHECK_EQ(this, originalFiberManager);
220 bool hadRemoteFiber = true;
221 while (hadRemoteFiber) {
222 hadRemoteFiber = false;
224 while (!readyFibers_.empty()) {
225 auto& fiber = readyFibers_.front();
226 readyFibers_.pop_front();
227 runReadyFiber(&fiber);
230 remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
231 runReadyFiber(fiber);
232 hadRemoteFiber = true;
235 remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
236 std::unique_ptr<RemoteTask> task(taskPtr);
237 auto fiber = getFiber();
238 if (task->localData) {
239 fiber->localData_ = *task->localData;
241 fiber->rcontext_ = std::move(task->rcontext);
243 fiber->setFunction(std::move(task->func));
245 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
247 runReadyFiber(fiber);
248 hadRemoteFiber = true;
253 for (auto& yielded : yieldedFibers_) {
254 observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
257 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
260 // We need this to be in a struct, not inlined in addTask, because clang crashes
262 template <typename F>
263 struct FiberManager::AddTaskHelper {
266 static constexpr bool allocateInBuffer =
267 sizeof(Func) <= Fiber::kUserBufferSize;
271 Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
277 fm_.exceptionCallback_(
278 std::current_exception(), "running Func functor");
280 if (allocateInBuffer) {
293 template <typename F>
294 void FiberManager::addTask(F&& func) {
295 typedef AddTaskHelper<F> Helper;
297 auto fiber = getFiber();
298 initLocalData(*fiber);
300 if (Helper::allocateInBuffer) {
301 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
302 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
304 fiber->setFunction(std::ref(*funcLoc));
306 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
308 fiber->setFunction(std::ref(*funcLoc));
311 readyFibers_.push_back(*fiber);
313 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
316 ensureLoopScheduled();
319 template <typename F>
320 void FiberManager::addTaskRemote(F&& func) {
322 auto currentFm = getFiberManagerUnsafe();
323 if (currentFm && currentFm->currentFiber_ &&
324 currentFm->localType_ == localType_) {
325 return std::make_unique<RemoteTask>(
326 std::forward<F>(func), currentFm->currentFiber_->localData_);
328 return std::make_unique<RemoteTask>(std::forward<F>(func));
330 auto insertHead = [&]() {
331 return remoteTaskQueue_.insertHead(task.release());
333 loopController_->scheduleThreadSafe(std::ref(insertHead));
336 template <typename X>
337 struct IsRvalueRefTry {
338 static const bool value = false;
340 template <typename T>
341 struct IsRvalueRefTry<folly::Try<T>&&> {
342 static const bool value = true;
345 // We need this to be in a struct, not inlined in addTaskFinally, because clang
346 // crashes otherwise.
347 template <typename F, typename G>
348 struct FiberManager::AddTaskFinallyHelper {
351 typedef typename std::result_of<F()>::type Result;
355 Finally(G finally, FiberManager& fm)
356 : finally_(std::move(finally)), fm_(fm) {}
360 finally_(std::move(*result_));
362 fm_.exceptionCallback_(
363 std::current_exception(), "running Finally functor");
366 if (allocateInBuffer) {
377 folly::Optional<folly::Try<Result>> result_;
383 Func(F func, Finally& finally)
384 : func_(std::move(func)), result_(finally.result_) {}
387 result_ = folly::makeTryWith(std::move(func_));
389 if (allocateInBuffer) {
398 folly::Optional<folly::Try<Result>>& result_;
401 static constexpr bool allocateInBuffer =
402 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
405 template <typename F, typename G>
406 void FiberManager::addTaskFinally(F&& func, G&& finally) {
407 typedef typename std::result_of<F()>::type Result;
410 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
411 "finally(arg): arg must be Try<T>&&");
415 typename std::remove_reference<
416 typename FirstArgOf<G>::type>::type::element_type>::value,
417 "finally(Try<T>&&): T must be convertible from func()'s return type");
419 auto fiber = getFiber();
420 initLocalData(*fiber);
422 typedef AddTaskFinallyHelper<
423 typename std::decay<F>::type,
424 typename std::decay<G>::type>
427 if (Helper::allocateInBuffer) {
428 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
430 static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
432 new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
433 new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
435 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
438 new typename Helper::Finally(std::forward<G>(finally), *this);
440 new typename Helper::Func(std::forward<F>(func), *finallyLoc);
442 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
445 readyFibers_.push_back(*fiber);
447 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
450 ensureLoopScheduled();
453 template <typename F>
454 typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
455 if (UNLIKELY(activeFiber_ == nullptr)) {
459 typedef typename std::result_of<F()>::type Result;
461 folly::Try<Result> result;
462 auto f = [&func, &result]() mutable {
463 result = folly::makeTryWith(std::forward<F>(func));
466 immediateFunc_ = std::ref(f);
467 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
469 return std::move(result).value();
472 inline FiberManager& FiberManager::getFiberManager() {
473 assert(currentFiberManager_ != nullptr);
474 return *currentFiberManager_;
477 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
478 return currentFiberManager_;
481 inline bool FiberManager::hasActiveFiber() const {
482 return activeFiber_ != nullptr;
485 inline void FiberManager::yield() {
486 assert(currentFiberManager_ == this);
487 assert(activeFiber_ != nullptr);
488 assert(activeFiber_->state_ == Fiber::RUNNING);
489 activeFiber_->preempt(Fiber::YIELDED);
492 template <typename T>
493 T& FiberManager::local() {
494 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
495 return currentFiber_->localData_.get<T>();
497 return localThread<T>();
500 template <typename T>
501 T& FiberManager::localThread() {
503 static thread_local T t;
505 #else // osx doesn't support thread_local
506 static ThreadLocal<T> t;
511 inline void FiberManager::initLocalData(Fiber& fiber) {
512 auto fm = getFiberManagerUnsafe();
513 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
514 fiber.localData_ = fm->currentFiber_->localData_;
516 fiber.rcontext_ = RequestContext::saveContext();
519 template <typename LocalT>
520 FiberManager::FiberManager(
522 std::unique_ptr<LoopController> loopController__,
524 : loopController_(std::move(loopController__)),
525 stackAllocator_(options.useGuardPages),
526 options_(preprocessOptions(std::move(options))),
527 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
529 std::rethrow_exception(eptr);
530 } catch (const std::exception& e) {
531 LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
532 << e.what() << "' was thrown in "
533 << "FiberManager with context '" << context << "'";
535 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
536 << "context '" << context << "'";
539 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
540 fibersPoolResizer_(*this),
541 localType_(typeid(LocalT)) {
542 loopController_->setFiberManager(this);
545 template <typename F>
546 typename FirstArgOf<F>::type::value_type inline await(F&& func) {
547 typedef typename FirstArgOf<F>::type::value_type Result;
548 typedef typename FirstArgOf<F>::type::baton_type BatonT;
550 return Promise<Result, BatonT>::await(std::forward<F>(func));