2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
26 #include <folly/ThreadLocal.h>
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Try.h>
34 namespace folly { namespace fibers {
38 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
39 #ifdef FOLLY_SANITIZE_ADDRESS
40 /* ASAN needs a lot of extra stack space.
41 16x is a conservative estimate, 8x also worked with tests
42 where it mattered. Note that overallocating here does not necessarily
43 increase RSS, since unused memory is pretty much free. */
51 inline void FiberManager::ensureLoopScheduled() {
52 if (isLoopScheduled_) {
56 isLoopScheduled_ = true;
57 loopController_->schedule();
60 inline void FiberManager::runReadyFiber(Fiber* fiber) {
62 assert(currentFiber_ == nullptr);
63 assert(activeFiber_ == nullptr);
66 assert(fiber->state_ == Fiber::NOT_STARTED ||
67 fiber->state_ == Fiber::READY_TO_RUN);
68 currentFiber_ = fiber;
69 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
71 observer_->starting(reinterpret_cast<uintptr_t>(fiber));
74 while (fiber->state_ == Fiber::NOT_STARTED ||
75 fiber->state_ == Fiber::READY_TO_RUN) {
77 jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
78 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
82 exceptionCallback_(std::current_exception(), "running immediateFunc_");
84 immediateFunc_ = nullptr;
85 fiber->state_ = Fiber::READY_TO_RUN;
89 if (fiber->state_ == Fiber::AWAITING) {
93 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
95 currentFiber_ = nullptr;
96 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
97 } else if (fiber->state_ == Fiber::INVALID) {
98 assert(fibersActive_ > 0);
100 // Making sure that task functor is deleted once task is complete.
101 // NOTE: we must do it on main context, as the fiber is not
102 // running at this point.
103 fiber->func_ = nullptr;
104 fiber->resultFunc_ = nullptr;
105 if (fiber->finallyFunc_) {
107 fiber->finallyFunc_();
109 exceptionCallback_(std::current_exception(), "running finallyFunc_");
111 fiber->finallyFunc_ = nullptr;
113 // Make sure LocalData is not accessible from its destructor
115 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
117 currentFiber_ = nullptr;
118 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
119 fiber->localData_.reset();
120 fiber->rcontext_.reset();
122 if (fibersPoolSize_ < options_.maxFibersPoolSize ||
123 options_.fibersPoolResizePeriodMs > 0) {
124 fibersPool_.push_front(*fiber);
128 assert(fibersAllocated_ > 0);
131 } else if (fiber->state_ == Fiber::YIELDED) {
133 observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
135 currentFiber_ = nullptr;
136 fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
137 fiber->state_ = Fiber::READY_TO_RUN;
138 yieldedFibers_.push_back(*fiber);
142 inline bool FiberManager::loopUntilNoReady() {
144 isLoopScheduled_ = false;
145 if (!readyFibers_.empty()) {
146 ensureLoopScheduled();
148 currentFiberManager_ = nullptr;
151 currentFiberManager_ = this;
153 bool hadRemoteFiber = true;
154 while (hadRemoteFiber) {
155 hadRemoteFiber = false;
157 while (!readyFibers_.empty()) {
158 auto& fiber = readyFibers_.front();
159 readyFibers_.pop_front();
160 runReadyFiber(&fiber);
163 remoteReadyQueue_.sweep(
164 [this, &hadRemoteFiber] (Fiber* fiber) {
165 runReadyFiber(fiber);
166 hadRemoteFiber = true;
170 remoteTaskQueue_.sweep(
171 [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
172 std::unique_ptr<RemoteTask> task(taskPtr);
173 auto fiber = getFiber();
174 if (task->localData) {
175 fiber->localData_ = *task->localData;
177 fiber->rcontext_ = std::move(task->rcontext);
179 fiber->setFunction(std::move(task->func));
180 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
182 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
184 runReadyFiber(fiber);
185 hadRemoteFiber = true;
191 for (auto& yielded : yieldedFibers_) {
192 observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
195 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
197 return fibersActive_ > 0;
200 // We need this to be in a struct, not inlined in addTask, because clang crashes
202 template <typename F>
203 struct FiberManager::AddTaskHelper {
206 static constexpr bool allocateInBuffer =
207 sizeof(Func) <= Fiber::kUserBufferSize;
211 Func(F&& func, FiberManager& fm) :
212 func_(std::forward<F>(func)), fm_(fm) {}
218 fm_.exceptionCallback_(std::current_exception(),
219 "running Func functor");
221 if (allocateInBuffer) {
234 template <typename F>
235 void FiberManager::addTask(F&& func) {
236 typedef AddTaskHelper<F> Helper;
238 auto fiber = getFiber();
239 initLocalData(*fiber);
241 if (Helper::allocateInBuffer) {
242 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
243 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
245 fiber->setFunction(std::ref(*funcLoc));
247 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
249 fiber->setFunction(std::ref(*funcLoc));
252 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
253 readyFibers_.push_back(*fiber);
255 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
258 ensureLoopScheduled();
261 template <typename F>
262 void FiberManager::addTaskRemote(F&& func) {
264 auto currentFm = getFiberManagerUnsafe();
266 currentFm->currentFiber_ &&
267 currentFm->localType_ == localType_) {
268 return folly::make_unique<RemoteTask>(
269 std::forward<F>(func),
270 currentFm->currentFiber_->localData_);
272 return folly::make_unique<RemoteTask>(std::forward<F>(func));
275 [&]() { return remoteTaskQueue_.insertHead(task.release()); };
276 loopController_->scheduleThreadSafe(std::ref(insertHead));
279 template <typename X>
280 struct IsRvalueRefTry { static const bool value = false; };
281 template <typename T>
282 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
284 // We need this to be in a struct, not inlined in addTaskFinally, because clang
285 // crashes otherwise.
286 template <typename F, typename G>
287 struct FiberManager::AddTaskFinallyHelper {
291 typedef typename std::result_of<F()>::type Result;
293 static constexpr bool allocateInBuffer =
294 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
300 finally_(std::move(finally)),
306 finally_(std::move(*result_));
308 fm_.exceptionCallback_(std::current_exception(),
309 "running Finally functor");
312 if (allocateInBuffer) {
323 folly::Optional<folly::Try<Result>> result_;
329 Func(F&& func, Finally& finally) :
330 func_(std::move(func)), result_(finally.result_) {}
333 result_ = folly::makeTryWith(std::move(func_));
335 if (allocateInBuffer) {
344 folly::Optional<folly::Try<Result>>& result_;
348 template <typename F, typename G>
349 void FiberManager::addTaskFinally(F&& func, G&& finally) {
350 typedef typename std::result_of<F()>::type Result;
353 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
354 "finally(arg): arg must be Try<T>&&");
358 typename std::remove_reference<
359 typename FirstArgOf<G>::type
360 >::type::element_type
362 "finally(Try<T>&&): T must be convertible from func()'s return type");
364 auto fiber = getFiber();
365 initLocalData(*fiber);
367 typedef AddTaskFinallyHelper<F,G> Helper;
369 if (Helper::allocateInBuffer) {
370 auto funcLoc = static_cast<typename Helper::Func*>(
371 fiber->getUserBuffer());
372 auto finallyLoc = static_cast<typename Helper::Finally*>(
373 static_cast<void*>(funcLoc + 1));
375 new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
376 new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
378 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
380 auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
381 auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
383 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
386 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
387 readyFibers_.push_back(*fiber);
389 observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
392 ensureLoopScheduled();
395 template <typename F>
396 typename std::result_of<F()>::type
397 FiberManager::runInMainContext(F&& func) {
398 return runInMainContextHelper(std::forward<F>(func));
401 template <typename F>
402 inline typename std::enable_if<
403 !std::is_same<typename std::result_of<F()>::type, void>::value,
404 typename std::result_of<F()>::type>::type
405 FiberManager::runInMainContextHelper(F&& func) {
406 if (UNLIKELY(activeFiber_ == nullptr)) {
410 typedef typename std::result_of<F()>::type Result;
412 folly::Try<Result> result;
413 auto f = [&func, &result]() mutable {
414 result = folly::makeTryWith(std::forward<F>(func));
417 immediateFunc_ = std::ref(f);
418 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
420 return std::move(result.value());
423 template <typename F>
424 inline typename std::enable_if<
425 std::is_same<typename std::result_of<F()>::type, void>::value,
427 FiberManager::runInMainContextHelper(F&& func) {
428 if (UNLIKELY(activeFiber_ == nullptr)) {
433 immediateFunc_ = std::ref(func);
434 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
437 inline FiberManager& FiberManager::getFiberManager() {
438 assert(currentFiberManager_ != nullptr);
439 return *currentFiberManager_;
442 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
443 return currentFiberManager_;
446 inline bool FiberManager::hasActiveFiber() const {
447 return activeFiber_ != nullptr;
450 inline void FiberManager::yield() {
451 assert(currentFiberManager_ == this);
452 assert(activeFiber_ != nullptr);
453 assert(activeFiber_->state_ == Fiber::RUNNING);
454 activeFiber_->preempt(Fiber::YIELDED);
457 template <typename T>
458 T& FiberManager::local() {
459 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
460 return currentFiber_->localData_.get<T>();
462 return localThread<T>();
465 template <typename T>
466 T& FiberManager::localThread() {
468 static thread_local T t;
470 #else // osx doesn't support thread_local
471 static ThreadLocal<T> t;
476 inline void FiberManager::initLocalData(Fiber& fiber) {
477 auto fm = getFiberManagerUnsafe();
478 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
479 fiber.localData_ = fm->currentFiber_->localData_;
481 fiber.rcontext_ = RequestContext::saveContext();
484 template <typename LocalT>
485 FiberManager::FiberManager(
487 std::unique_ptr<LoopController> loopController__,
489 loopController_(std::move(loopController__)),
490 stackAllocator_(options.useGuardPages),
491 options_(preprocessOptions(std::move(options))),
492 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
494 std::rethrow_exception(eptr);
495 } catch (const std::exception& e) {
496 LOG(DFATAL) << "Exception " << typeid(e).name()
497 << " with message '" << e.what() << "' was thrown in "
498 << "FiberManager with context '" << context << "'";
501 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
502 << "context '" << context << "'";
506 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
507 fibersPoolResizer_(*this),
508 localType_(typeid(LocalT)) {
509 loopController_->setFiberManager(this);
512 template <typename F>
513 typename FirstArgOf<F>::type::value_type
514 inline await(F&& func) {
515 typedef typename FirstArgOf<F>::type::value_type Result;
517 folly::Try<Result> result;
520 baton.wait([&func, &result, &baton]() mutable {
521 func(Promise<Result>(result, baton));
524 return folly::moveFromTry(result);