2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
26 #include <folly/ThreadLocal.h>
28 #include <folly/fibers/Baton.h>
29 #include <folly/fibers/Fiber.h>
30 #include <folly/fibers/LoopController.h>
31 #include <folly/fibers/Promise.h>
32 #include <folly/Try.h>
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #ifdef FOLLY_SANITIZE_ADDRESS
41 /* ASAN needs a lot of extra stack space.
42 16x is a conservative estimate, 8x also worked with tests
43 where it mattered. Note that overallocating here does not necessarily
44 increase RSS, since unused memory is pretty much free. */
52 inline void FiberManager::ensureLoopScheduled() {
53 if (isLoopScheduled_) {
57 isLoopScheduled_ = true;
58 loopController_->schedule();
61 inline void FiberManager::activateFiber(Fiber* fiber) {
62 DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
64 #ifdef FOLLY_SANITIZE_ADDRESS
65 DCHECK(!fiber->asanMainStackBase_);
66 DCHECK(!fiber->asanMainStackSize_);
67 auto stack = fiber->getStack();
69 registerStartSwitchStackWithAsan(&asanFakeStack, stack.first, stack.second);
71 registerFinishSwitchStackWithAsan(asanFakeStack, nullptr, nullptr);
72 fiber->asanMainStackBase_ = nullptr;
73 fiber->asanMainStackSize_ = 0;
79 &mainContext_, &fiber->fcontext_, reinterpret_cast<intptr_t>(fiber));
82 inline void FiberManager::deactivateFiber(Fiber* fiber) {
83 DCHECK_EQ(activeFiber_, fiber);
85 #ifdef FOLLY_SANITIZE_ADDRESS
86 DCHECK(fiber->asanMainStackBase_);
87 DCHECK(fiber->asanMainStackSize_);
89 // Release fake stack if fiber is completed
90 auto saveFakeStackPtr =
91 fiber->state_ == Fiber::INVALID ? nullptr : &fiber->asanFakeStack_;
92 registerStartSwitchStackWithAsan(
93 saveFakeStackPtr, fiber->asanMainStackBase_, fiber->asanMainStackSize_);
95 registerFinishSwitchStackWithAsan(
96 fiber->asanFakeStack_,
97 &fiber->asanMainStackBase_,
98 &fiber->asanMainStackSize_);
99 fiber->asanFakeStack_ = nullptr;
103 activeFiber_ = nullptr;
104 auto context = jumpContext(&fiber->fcontext_, &mainContext_, 0);
105 DCHECK_EQ(fiber, reinterpret_cast<Fiber*>(context));
108 inline void FiberManager::runReadyFiber(Fiber* fiber) {
110 assert(currentFiber_ == nullptr);
111 assert(activeFiber_ == nullptr);
115 fiber->state_ == Fiber::NOT_STARTED ||
116 fiber->state_ == Fiber::READY_TO_RUN);
117 currentFiber_ = fiber;
118 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
120 observer_->starting(reinterpret_cast<uintptr_t>(fiber));
123 while (fiber->state_ == Fiber::NOT_STARTED ||
124 fiber->state_ == Fiber::READY_TO_RUN) {
125 activateFiber(fiber);
126 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
130 exceptionCallback_(std::current_exception(), "running immediateFunc_");
132 immediateFunc_ = nullptr;
133 fiber->state_ = Fiber::READY_TO_RUN;
137 if (fiber->state_ == Fiber::AWAITING) {
139 awaitFunc_ = nullptr;
141 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
143 currentFiber_ = nullptr;
144 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
145 } else if (fiber->state_ == Fiber::INVALID) {
146 assert(fibersActive_ > 0);
148 // Making sure that task functor is deleted once task is complete.
149 // NOTE: we must do it on main context, as the fiber is not
150 // running at this point.
151 fiber->func_ = nullptr;
152 fiber->resultFunc_ = nullptr;
153 if (fiber->finallyFunc_) {
155 fiber->finallyFunc_();
157 exceptionCallback_(std::current_exception(), "running finallyFunc_");
159 fiber->finallyFunc_ = nullptr;
161 // Make sure LocalData is not accessible from its destructor
163 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
165 currentFiber_ = nullptr;
166 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
167 fiber->localData_.reset();
168 fiber->rcontext_.reset();
170 if (fibersPoolSize_ < options_.maxFibersPoolSize ||
171 options_.fibersPoolResizePeriodMs > 0) {
172 fibersPool_.push_front(*fiber);
176 assert(fibersAllocated_ > 0);
179 } else if (fiber->state_ == Fiber::YIELDED) {
181 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
183 currentFiber_ = nullptr;
184 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
185 fiber->state_ = Fiber::READY_TO_RUN;
186 yieldedFibers_.push_back(*fiber);
190 inline bool FiberManager::loopUntilNoReady() {
192 if (UNLIKELY(!alternateSignalStackRegistered_)) {
193 registerAlternateSignalStack();
197 // Support nested FiberManagers
198 auto originalFiberManager = this;
199 std::swap(currentFiberManager_, originalFiberManager);
202 isLoopScheduled_ = false;
203 if (!readyFibers_.empty()) {
204 ensureLoopScheduled();
206 std::swap(currentFiberManager_, originalFiberManager);
207 CHECK_EQ(this, originalFiberManager);
210 bool hadRemoteFiber = true;
211 while (hadRemoteFiber) {
212 hadRemoteFiber = false;
214 while (!readyFibers_.empty()) {
215 auto& fiber = readyFibers_.front();
216 readyFibers_.pop_front();
217 runReadyFiber(&fiber);
220 remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
221 runReadyFiber(fiber);
222 hadRemoteFiber = true;
225 remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
226 std::unique_ptr<RemoteTask> task(taskPtr);
227 auto fiber = getFiber();
228 if (task->localData) {
229 fiber->localData_ = *task->localData;
231 fiber->rcontext_ = std::move(task->rcontext);
233 fiber->setFunction(std::move(task->func));
235 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
237 runReadyFiber(fiber);
238 hadRemoteFiber = true;
243 for (auto& yielded : yieldedFibers_) {
244 observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
247 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
249 return fibersActive_ > 0;
252 // We need this to be in a struct, not inlined in addTask, because clang crashes
254 template <typename F>
255 struct FiberManager::AddTaskHelper {
258 static constexpr bool allocateInBuffer =
259 sizeof(Func) <= Fiber::kUserBufferSize;
263 Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
269 fm_.exceptionCallback_(
270 std::current_exception(), "running Func functor");
272 if (allocateInBuffer) {
285 template <typename F>
286 void FiberManager::addTask(F&& func) {
287 typedef AddTaskHelper<F> Helper;
289 auto fiber = getFiber();
290 initLocalData(*fiber);
292 if (Helper::allocateInBuffer) {
293 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
294 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
296 fiber->setFunction(std::ref(*funcLoc));
298 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
300 fiber->setFunction(std::ref(*funcLoc));
303 readyFibers_.push_back(*fiber);
305 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
308 ensureLoopScheduled();
311 template <typename F>
312 void FiberManager::addTaskRemote(F&& func) {
314 auto currentFm = getFiberManagerUnsafe();
315 if (currentFm && currentFm->currentFiber_ &&
316 currentFm->localType_ == localType_) {
317 return folly::make_unique<RemoteTask>(
318 std::forward<F>(func), currentFm->currentFiber_->localData_);
320 return folly::make_unique<RemoteTask>(std::forward<F>(func));
322 auto insertHead = [&]() {
323 return remoteTaskQueue_.insertHead(task.release());
325 loopController_->scheduleThreadSafe(std::ref(insertHead));
328 template <typename X>
329 struct IsRvalueRefTry {
330 static const bool value = false;
332 template <typename T>
333 struct IsRvalueRefTry<folly::Try<T>&&> {
334 static const bool value = true;
337 // We need this to be in a struct, not inlined in addTaskFinally, because clang
338 // crashes otherwise.
339 template <typename F, typename G>
340 struct FiberManager::AddTaskFinallyHelper {
343 typedef typename std::result_of<F()>::type Result;
347 Finally(G finally, FiberManager& fm)
348 : finally_(std::move(finally)), fm_(fm) {}
352 finally_(std::move(*result_));
354 fm_.exceptionCallback_(
355 std::current_exception(), "running Finally functor");
358 if (allocateInBuffer) {
369 folly::Optional<folly::Try<Result>> result_;
375 Func(F func, Finally& finally)
376 : func_(std::move(func)), result_(finally.result_) {}
379 result_ = folly::makeTryWith(std::move(func_));
381 if (allocateInBuffer) {
390 folly::Optional<folly::Try<Result>>& result_;
393 static constexpr bool allocateInBuffer =
394 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
397 template <typename F, typename G>
398 void FiberManager::addTaskFinally(F&& func, G&& finally) {
399 typedef typename std::result_of<F()>::type Result;
402 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
403 "finally(arg): arg must be Try<T>&&");
407 typename std::remove_reference<
408 typename FirstArgOf<G>::type>::type::element_type>::value,
409 "finally(Try<T>&&): T must be convertible from func()'s return type");
411 auto fiber = getFiber();
412 initLocalData(*fiber);
414 typedef AddTaskFinallyHelper<
415 typename std::decay<F>::type,
416 typename std::decay<G>::type>
419 if (Helper::allocateInBuffer) {
420 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
422 static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
424 new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
425 new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
427 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
430 new typename Helper::Finally(std::forward<G>(finally), *this);
432 new typename Helper::Func(std::forward<F>(func), *finallyLoc);
434 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
437 readyFibers_.push_back(*fiber);
439 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
442 ensureLoopScheduled();
445 template <typename F>
446 typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
447 if (UNLIKELY(activeFiber_ == nullptr)) {
451 typedef typename std::result_of<F()>::type Result;
453 folly::Try<Result> result;
454 auto f = [&func, &result]() mutable {
455 result = folly::makeTryWith(std::forward<F>(func));
458 immediateFunc_ = std::ref(f);
459 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
461 return std::move(result).value();
464 inline FiberManager& FiberManager::getFiberManager() {
465 assert(currentFiberManager_ != nullptr);
466 return *currentFiberManager_;
469 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
470 return currentFiberManager_;
473 inline bool FiberManager::hasActiveFiber() const {
474 return activeFiber_ != nullptr;
477 inline void FiberManager::yield() {
478 assert(currentFiberManager_ == this);
479 assert(activeFiber_ != nullptr);
480 assert(activeFiber_->state_ == Fiber::RUNNING);
481 activeFiber_->preempt(Fiber::YIELDED);
484 template <typename T>
485 T& FiberManager::local() {
486 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
487 return currentFiber_->localData_.get<T>();
489 return localThread<T>();
492 template <typename T>
493 T& FiberManager::localThread() {
495 static thread_local T t;
497 #else // osx doesn't support thread_local
498 static ThreadLocal<T> t;
503 inline void FiberManager::initLocalData(Fiber& fiber) {
504 auto fm = getFiberManagerUnsafe();
505 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
506 fiber.localData_ = fm->currentFiber_->localData_;
508 fiber.rcontext_ = RequestContext::saveContext();
511 template <typename LocalT>
512 FiberManager::FiberManager(
514 std::unique_ptr<LoopController> loopController__,
516 : loopController_(std::move(loopController__)),
517 stackAllocator_(options.useGuardPages),
518 options_(preprocessOptions(std::move(options))),
519 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
521 std::rethrow_exception(eptr);
522 } catch (const std::exception& e) {
523 LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
524 << e.what() << "' was thrown in "
525 << "FiberManager with context '" << context << "'";
527 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
528 << "context '" << context << "'";
531 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
532 fibersPoolResizer_(*this),
533 localType_(typeid(LocalT)) {
534 loopController_->setFiberManager(this);
537 template <typename F>
538 typename FirstArgOf<F>::type::value_type inline await(F&& func) {
539 typedef typename FirstArgOf<F>::type::value_type Result;
540 typedef typename FirstArgOf<F>::type::baton_type BatonT;
542 return Promise<Result, BatonT>::await(std::forward<F>(func));