From c93226d1035504e84e4015593dd8449503c02a2e Mon Sep 17 00:00:00 2001 From: Andrii Grynenko Date: Mon, 29 Aug 2016 16:02:38 -0700 Subject: [PATCH] Resolve fibers-futures dependency Reviewed By: mzlee Differential Revision: D3780312 fbshipit-source-id: c42c8f0a06b82520ee1b46f105a2a85ad524c442 --- folly/Makefile.am | 2 + folly/fibers/AddTasks.h | 2 +- folly/fibers/Baton-inl.h | 2 +- folly/fibers/Baton.cpp | 2 +- folly/fibers/EventBaseLoopController.h | 2 +- folly/fibers/Fiber.cpp | 2 +- folly/fibers/FiberManager-inl.h | 501 --------------------- folly/fibers/FiberManager.cpp | 2 +- folly/fibers/FiberManager.h | 556 +---------------------- folly/fibers/FiberManagerFuture.h | 54 +++ folly/fibers/FiberManagerInternal-inl.h | 523 +++++++++++++++++++++ folly/fibers/FiberManagerInternal.h | 573 ++++++++++++++++++++++++ folly/fibers/FiberManagerMap.h | 2 +- folly/fibers/ForEach-inl.h | 2 +- folly/fibers/WhenN-inl.h | 2 +- 15 files changed, 1162 insertions(+), 1065 deletions(-) create mode 100644 folly/fibers/FiberManagerFuture.h create mode 100644 folly/fibers/FiberManagerInternal-inl.h create mode 100644 folly/fibers/FiberManagerInternal.h diff --git a/folly/Makefile.am b/folly/Makefile.am index ac3f8829..baffeb45 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -514,6 +514,8 @@ nobase_follyinclude_HEADERS += \ fibers/Fiber-inl.h \ fibers/FiberManager.h \ fibers/FiberManager-inl.h \ + fibers/FiberManagerInternal.h \ + fibers/FiberManagerInternal-inl.h \ fibers/FiberManagerMap.h \ fibers/ForEach.h \ fibers/ForEach-inl.h \ diff --git a/folly/fibers/AddTasks.h b/folly/fibers/AddTasks.h index 63785eaf..ecbcaf3d 100644 --- a/folly/fibers/AddTasks.h +++ b/folly/fibers/AddTasks.h @@ -19,7 +19,7 @@ #include #include -#include +#include #include #include diff --git a/folly/fibers/Baton-inl.h b/folly/fibers/Baton-inl.h index 6ca2de93..adec3c5c 100644 --- a/folly/fibers/Baton-inl.h +++ b/folly/fibers/Baton-inl.h @@ -14,7 +14,7 @@ * limitations under the License. */ #include -#include +#include namespace folly { namespace fibers { diff --git a/folly/fibers/Baton.cpp b/folly/fibers/Baton.cpp index 5f5a104c..bd685cff 100644 --- a/folly/fibers/Baton.cpp +++ b/folly/fibers/Baton.cpp @@ -18,7 +18,7 @@ #include #include -#include +#include #include namespace folly { diff --git a/folly/fibers/EventBaseLoopController.h b/folly/fibers/EventBaseLoopController.h index f1c41287..017b7522 100644 --- a/folly/fibers/EventBaseLoopController.h +++ b/folly/fibers/EventBaseLoopController.h @@ -15,7 +15,7 @@ */ #pragma once -#include +#include #include #include #include diff --git a/folly/fibers/Fiber.cpp b/folly/fibers/Fiber.cpp index ea33b6bf..afb72c07 100644 --- a/folly/fibers/Fiber.cpp +++ b/folly/fibers/Fiber.cpp @@ -23,7 +23,7 @@ #include #include #include -#include +#include #include #include diff --git a/folly/fibers/FiberManager-inl.h b/folly/fibers/FiberManager-inl.h index 3fe5b5f5..99c51b48 100644 --- a/folly/fibers/FiberManager-inl.h +++ b/folly/fibers/FiberManager-inl.h @@ -15,277 +15,11 @@ */ #pragma once -#include - -#include -#include -#include -#include -#include -#ifdef __APPLE__ -#include -#endif -#include -#include -#include -#include #include -#include namespace folly { namespace fibers { -namespace { - -inline FiberManager::Options preprocessOptions(FiberManager::Options opts) { -#ifdef FOLLY_SANITIZE_ADDRESS - /* ASAN needs a lot of extra stack space. - 16x is a conservative estimate, 8x also worked with tests - where it mattered. Note that overallocating here does not necessarily - increase RSS, since unused memory is pretty much free. */ - opts.stackSize *= 16; -#endif - return opts; -} - -} // anonymous - -inline void FiberManager::ensureLoopScheduled() { - if (isLoopScheduled_) { - return; - } - - isLoopScheduled_ = true; - loopController_->schedule(); -} - -inline intptr_t FiberManager::activateFiber(Fiber* fiber) { - DCHECK_EQ(activeFiber_, (Fiber*)nullptr); - -#ifdef FOLLY_SANITIZE_ADDRESS - registerFiberActivationWithAsan(fiber); -#endif - - activeFiber_ = fiber; - return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_); -} - -inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) { - DCHECK_EQ(activeFiber_, fiber); - -#ifdef FOLLY_SANITIZE_ADDRESS - registerFiberDeactivationWithAsan(fiber); -#endif - - activeFiber_ = nullptr; - return jumpContext(&fiber->fcontext_, &mainContext_, 0); -} - -inline void FiberManager::runReadyFiber(Fiber* fiber) { - SCOPE_EXIT { - assert(currentFiber_ == nullptr); - assert(activeFiber_ == nullptr); - }; - - assert( - fiber->state_ == Fiber::NOT_STARTED || - fiber->state_ == Fiber::READY_TO_RUN); - currentFiber_ = fiber; - fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_)); - if (observer_) { - observer_->starting(reinterpret_cast(fiber)); - } - - while (fiber->state_ == Fiber::NOT_STARTED || - fiber->state_ == Fiber::READY_TO_RUN) { - activateFiber(fiber); - if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) { - try { - immediateFunc_(); - } catch (...) { - exceptionCallback_(std::current_exception(), "running immediateFunc_"); - } - immediateFunc_ = nullptr; - fiber->state_ = Fiber::READY_TO_RUN; - } - } - - if (fiber->state_ == Fiber::AWAITING) { - awaitFunc_(*fiber); - awaitFunc_ = nullptr; - if (observer_) { - observer_->stopped(reinterpret_cast(fiber)); - } - currentFiber_ = nullptr; - fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_)); - } else if (fiber->state_ == Fiber::INVALID) { - assert(fibersActive_ > 0); - --fibersActive_; - // Making sure that task functor is deleted once task is complete. - // NOTE: we must do it on main context, as the fiber is not - // running at this point. - fiber->func_ = nullptr; - fiber->resultFunc_ = nullptr; - if (fiber->finallyFunc_) { - try { - fiber->finallyFunc_(); - } catch (...) { - exceptionCallback_(std::current_exception(), "running finallyFunc_"); - } - fiber->finallyFunc_ = nullptr; - } - // Make sure LocalData is not accessible from its destructor - if (observer_) { - observer_->stopped(reinterpret_cast(fiber)); - } - currentFiber_ = nullptr; - fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_)); - fiber->localData_.reset(); - fiber->rcontext_.reset(); - - if (fibersPoolSize_ < options_.maxFibersPoolSize || - options_.fibersPoolResizePeriodMs > 0) { - fibersPool_.push_front(*fiber); - ++fibersPoolSize_; - } else { - delete fiber; - assert(fibersAllocated_ > 0); - --fibersAllocated_; - } - } else if (fiber->state_ == Fiber::YIELDED) { - if (observer_) { - observer_->stopped(reinterpret_cast(fiber)); - } - currentFiber_ = nullptr; - fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_)); - fiber->state_ = Fiber::READY_TO_RUN; - yieldedFibers_.push_back(*fiber); - } -} - -inline bool FiberManager::loopUntilNoReady() { -#ifndef _WIN32 - if (UNLIKELY(!alternateSignalStackRegistered_)) { - registerAlternateSignalStack(); - } -#endif - - // Support nested FiberManagers - auto originalFiberManager = this; - std::swap(currentFiberManager_, originalFiberManager); - - SCOPE_EXIT { - isLoopScheduled_ = false; - if (!readyFibers_.empty()) { - ensureLoopScheduled(); - } - std::swap(currentFiberManager_, originalFiberManager); - CHECK_EQ(this, originalFiberManager); - }; - - bool hadRemoteFiber = true; - while (hadRemoteFiber) { - hadRemoteFiber = false; - - while (!readyFibers_.empty()) { - auto& fiber = readyFibers_.front(); - readyFibers_.pop_front(); - runReadyFiber(&fiber); - } - - remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) { - runReadyFiber(fiber); - hadRemoteFiber = true; - }); - - remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) { - std::unique_ptr task(taskPtr); - auto fiber = getFiber(); - if (task->localData) { - fiber->localData_ = *task->localData; - } - fiber->rcontext_ = std::move(task->rcontext); - - fiber->setFunction(std::move(task->func)); - fiber->data_ = reinterpret_cast(fiber); - if (observer_) { - observer_->runnable(reinterpret_cast(fiber)); - } - runReadyFiber(fiber); - hadRemoteFiber = true; - }); - } - - if (observer_) { - for (auto& yielded : yieldedFibers_) { - observer_->runnable(reinterpret_cast(&yielded)); - } - } - readyFibers_.splice(readyFibers_.end(), yieldedFibers_); - - return fibersActive_ > 0; -} - -// We need this to be in a struct, not inlined in addTask, because clang crashes -// otherwise. -template -struct FiberManager::AddTaskHelper { - class Func; - - static constexpr bool allocateInBuffer = - sizeof(Func) <= Fiber::kUserBufferSize; - - class Func { - public: - Func(F&& func, FiberManager& fm) : func_(std::forward(func)), fm_(fm) {} - - void operator()() { - try { - func_(); - } catch (...) { - fm_.exceptionCallback_( - std::current_exception(), "running Func functor"); - } - if (allocateInBuffer) { - this->~Func(); - } else { - delete this; - } - } - - private: - F func_; - FiberManager& fm_; - }; -}; - -template -void FiberManager::addTask(F&& func) { - typedef AddTaskHelper Helper; - - auto fiber = getFiber(); - initLocalData(*fiber); - - if (Helper::allocateInBuffer) { - auto funcLoc = static_cast(fiber->getUserBuffer()); - new (funcLoc) typename Helper::Func(std::forward(func), *this); - - fiber->setFunction(std::ref(*funcLoc)); - } else { - auto funcLoc = new typename Helper::Func(std::forward(func), *this); - - fiber->setFunction(std::ref(*funcLoc)); - } - - fiber->data_ = reinterpret_cast(fiber); - readyFibers_.push_back(*fiber); - if (observer_) { - observer_->runnable(reinterpret_cast(fiber)); - } - - ensureLoopScheduled(); -} - template auto FiberManager::addTaskFuture(F&& func) -> folly::Future< typename folly::Unit::Lift::type>::type> { @@ -302,23 +36,6 @@ auto FiberManager::addTaskFuture(F&& func) -> folly::Future< return f; } -template -void FiberManager::addTaskRemote(F&& func) { - auto task = [&]() { - auto currentFm = getFiberManagerUnsafe(); - if (currentFm && currentFm->currentFiber_ && - currentFm->localType_ == localType_) { - return folly::make_unique( - std::forward(func), currentFm->currentFiber_->localData_); - } - return folly::make_unique(std::forward(func)); - }(); - auto insertHead = [&]() { - return remoteTaskQueue_.insertHead(task.release()); - }; - loopController_->scheduleThreadSafe(std::ref(insertHead)); -} - template auto FiberManager::addTaskRemoteFuture(F&& func) -> folly::Future< typename folly::Unit::Lift::type>::type> { @@ -333,223 +50,5 @@ auto FiberManager::addTaskRemoteFuture(F&& func) -> folly::Future< }); return f; } - -template -struct IsRvalueRefTry { - static const bool value = false; -}; -template -struct IsRvalueRefTry&&> { - static const bool value = true; -}; - -// We need this to be in a struct, not inlined in addTaskFinally, because clang -// crashes otherwise. -template -struct FiberManager::AddTaskFinallyHelper { - class Func; - - typedef typename std::result_of::type Result; - - class Finally { - public: - Finally(G finally, FiberManager& fm) - : finally_(std::move(finally)), fm_(fm) {} - - void operator()() { - try { - finally_(std::move(*result_)); - } catch (...) { - fm_.exceptionCallback_( - std::current_exception(), "running Finally functor"); - } - - if (allocateInBuffer) { - this->~Finally(); - } else { - delete this; - } - } - - private: - friend class Func; - - G finally_; - folly::Optional> result_; - FiberManager& fm_; - }; - - class Func { - public: - Func(F func, Finally& finally) - : func_(std::move(func)), result_(finally.result_) {} - - void operator()() { - result_ = folly::makeTryWith(std::move(func_)); - - if (allocateInBuffer) { - this->~Func(); - } else { - delete this; - } - } - - private: - F func_; - folly::Optional>& result_; - }; - - static constexpr bool allocateInBuffer = - sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize; -}; - -template -void FiberManager::addTaskFinally(F&& func, G&& finally) { - typedef typename std::result_of::type Result; - - static_assert( - IsRvalueRefTry::type>::value, - "finally(arg): arg must be Try&&"); - static_assert( - std::is_convertible< - Result, - typename std::remove_reference< - typename FirstArgOf::type>::type::element_type>::value, - "finally(Try&&): T must be convertible from func()'s return type"); - - auto fiber = getFiber(); - initLocalData(*fiber); - - typedef AddTaskFinallyHelper< - typename std::decay::type, - typename std::decay::type> - Helper; - - if (Helper::allocateInBuffer) { - auto funcLoc = static_cast(fiber->getUserBuffer()); - auto finallyLoc = - static_cast(static_cast(funcLoc + 1)); - - new (finallyLoc) typename Helper::Finally(std::forward(finally), *this); - new (funcLoc) typename Helper::Func(std::forward(func), *finallyLoc); - - fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc)); - } else { - auto finallyLoc = - new typename Helper::Finally(std::forward(finally), *this); - auto funcLoc = - new typename Helper::Func(std::forward(func), *finallyLoc); - - fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc)); - } - - fiber->data_ = reinterpret_cast(fiber); - readyFibers_.push_back(*fiber); - if (observer_) { - observer_->runnable(reinterpret_cast(fiber)); - } - - ensureLoopScheduled(); -} - -template -typename std::result_of::type FiberManager::runInMainContext(F&& func) { - if (UNLIKELY(activeFiber_ == nullptr)) { - return func(); - } - - typedef typename std::result_of::type Result; - - folly::Try result; - auto f = [&func, &result]() mutable { - result = folly::makeTryWith(std::forward(func)); - }; - - immediateFunc_ = std::ref(f); - activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE); - - return std::move(result).value(); -} - -inline FiberManager& FiberManager::getFiberManager() { - assert(currentFiberManager_ != nullptr); - return *currentFiberManager_; -} - -inline FiberManager* FiberManager::getFiberManagerUnsafe() { - return currentFiberManager_; -} - -inline bool FiberManager::hasActiveFiber() const { - return activeFiber_ != nullptr; -} - -inline void FiberManager::yield() { - assert(currentFiberManager_ == this); - assert(activeFiber_ != nullptr); - assert(activeFiber_->state_ == Fiber::RUNNING); - activeFiber_->preempt(Fiber::YIELDED); -} - -template -T& FiberManager::local() { - if (std::type_index(typeid(T)) == localType_ && currentFiber_) { - return currentFiber_->localData_.get(); - } - return localThread(); -} - -template -T& FiberManager::localThread() { -#ifndef __APPLE__ - static thread_local T t; - return t; -#else // osx doesn't support thread_local - static ThreadLocal t; - return *t; -#endif -} - -inline void FiberManager::initLocalData(Fiber& fiber) { - auto fm = getFiberManagerUnsafe(); - if (fm && fm->currentFiber_ && fm->localType_ == localType_) { - fiber.localData_ = fm->currentFiber_->localData_; - } - fiber.rcontext_ = RequestContext::saveContext(); -} - -template -FiberManager::FiberManager( - LocalType, - std::unique_ptr loopController__, - Options options) - : loopController_(std::move(loopController__)), - stackAllocator_(options.useGuardPages), - options_(preprocessOptions(std::move(options))), - exceptionCallback_([](std::exception_ptr eptr, std::string context) { - try { - std::rethrow_exception(eptr); - } catch (const std::exception& e) { - LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '" - << e.what() << "' was thrown in " - << "FiberManager with context '" << context << "'"; - } catch (...) { - LOG(DFATAL) << "Unknown exception was thrown in FiberManager with " - << "context '" << context << "'"; - } - }), - timeoutManager_(std::make_shared(*loopController_)), - fibersPoolResizer_(*this), - localType_(typeid(LocalT)) { - loopController_->setFiberManager(this); -} - -template -typename FirstArgOf::type::value_type inline await(F&& func) { - typedef typename FirstArgOf::type::value_type Result; - typedef typename FirstArgOf::type::baton_type BatonT; - - return Promise::await(std::forward(func)); -} } } diff --git a/folly/fibers/FiberManager.cpp b/folly/fibers/FiberManager.cpp index 64b5b9d5..6528ca53 100644 --- a/folly/fibers/FiberManager.cpp +++ b/folly/fibers/FiberManager.cpp @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "FiberManager.h" +#include "FiberManagerInternal.h" #include diff --git a/folly/fibers/FiberManager.h b/folly/fibers/FiberManager.h index 558dce6f..b63abb4a 100644 --- a/folly/fibers/FiberManager.h +++ b/folly/fibers/FiberManager.h @@ -15,559 +15,5 @@ */ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -namespace folly { - -template -class Future; - -namespace fibers { - -class Baton; -class Fiber; -class LoopController; -class TimeoutController; - -template -class LocalType {}; - -class InlineFunctionRunner { - public: - virtual ~InlineFunctionRunner() {} - - /** - * func must be executed inline and only once. - */ - virtual void run(folly::Function func) = 0; -}; - -/** - * @class FiberManager - * @brief Single-threaded task execution engine. - * - * FiberManager allows semi-parallel task execution on the same thread. Each - * task can notify FiberManager that it is blocked on something (via await()) - * call. This will pause execution of this task and it will be resumed only - * when it is unblocked (via setData()). - */ -class FiberManager : public ::folly::Executor { - public: - struct Options { - static constexpr size_t kDefaultStackSize{16 * 1024}; - - /** - * Maximum stack size for fibers which will be used for executing all the - * tasks. - */ - size_t stackSize{kDefaultStackSize}; - - /** - * Record exact amount of stack used. - * - * This is fairly expensive: we fill each newly allocated stack - * with some known value and find the boundary of unused stack - * with linear search every time we surrender the stack back to fibersPool. - * 0 disables stack recording. - */ - size_t recordStackEvery{0}; - - /** - * Keep at most this many free fibers in the pool. - * This way the total number of fibers in the system is always bounded - * by the number of active fibers + maxFibersPoolSize. - */ - size_t maxFibersPoolSize{1000}; - - /** - * Protect limited amount of fiber stacks with guard pages. - */ - bool useGuardPages{true}; - - /** - * Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs - * milliseconds. If value is 0, periodic resizing of the fibers pool is - * disabled. - */ - uint32_t fibersPoolResizePeriodMs{0}; - - constexpr Options() {} - }; - - using ExceptionCallback = - folly::Function; - - FiberManager(const FiberManager&) = delete; - FiberManager& operator=(const FiberManager&) = delete; - - /** - * Initializes, but doesn't start FiberManager loop - * - * @param loopController - * @param options FiberManager options - */ - explicit FiberManager( - std::unique_ptr loopController, - Options options = Options()); - - /** - * Initializes, but doesn't start FiberManager loop - * - * @param loopController - * @param options FiberManager options - * @tparam LocalT only local of this type may be stored on fibers. - * Locals of other types will be considered thread-locals. - */ - template - FiberManager( - LocalType, - std::unique_ptr loopController, - Options options = Options()); - - ~FiberManager(); - - /** - * Controller access. - */ - LoopController& loopController(); - const LoopController& loopController() const; - - /** - * Keeps running ready tasks until the list of ready tasks is empty. - * - * @return True if there are any waiting tasks remaining. - */ - bool loopUntilNoReady(); - - /** - * @return true if there are outstanding tasks. - */ - bool hasTasks() const; - - /** - * Sets exception callback which will be called if any of the tasks throws an - * exception. - * - * @param ec - */ - void setExceptionCallback(ExceptionCallback ec); - - /** - * Add a new task to be executed. Must be called from FiberManager's thread. - * - * @param func Task functor; must have a signature of `void func()`. - * The object will be destroyed once task execution is complete. - */ - template - void addTask(F&& func); - - /** - * Add a new task to be executed and return a future that will be set on - * return from func. Must be called from FiberManager's thread. - * - * @param func Task functor; must have a signature of `void func()`. - * The object will be destroyed once task execution is complete. - */ - template - auto addTaskFuture(F&& func) -> folly::Future< - typename folly::Unit::Lift::type>::type>; - /** - * Add a new task to be executed. Safe to call from other threads. - * - * @param func Task function; must have a signature of `void func()`. - * The object will be destroyed once task execution is complete. - */ - template - void addTaskRemote(F&& func); - - /** - * Add a new task to be executed and return a future that will be set on - * return from func. Safe to call from other threads. - * - * @param func Task function; must have a signature of `void func()`. - * The object will be destroyed once task execution is complete. - */ - template - auto addTaskRemoteFuture(F&& func) -> folly::Future< - typename folly::Unit::Lift::type>::type>; - - // Executor interface calls addTaskRemote - void add(folly::Func f) override { - addTaskRemote(std::move(f)); - } - - /** - * Add a new task. When the task is complete, execute finally(Try&&) - * on the main context. - * - * @param func Task functor; must have a signature of `T func()` for some T. - * @param finally Finally functor; must have a signature of - * `void finally(Try&&)` and will be passed - * the result of func() (including the exception if occurred). - */ - template - void addTaskFinally(F&& func, G&& finally); - - /** - * If called from a fiber, immediately switches to the FiberManager's context - * and runs func(), going back to the Fiber's context after completion. - * Outside a fiber, just calls func() directly. - * - * @return value returned by func(). - */ - template - typename std::result_of::type runInMainContext(F&& func); - - /** - * Returns a refference to a fiber-local context for given Fiber. Should be - * always called with the same T for each fiber. Fiber-local context is lazily - * default-constructed on first request. - * When new task is scheduled via addTask / addTaskRemote from a fiber its - * fiber-local context is copied into the new fiber. - */ - template - T& local(); - - template - static T& localThread(); - - /** - * @return How many fiber objects (and stacks) has this manager allocated. - */ - size_t fibersAllocated() const; - - /** - * @return How many of the allocated fiber objects are currently - * in the free pool. - */ - size_t fibersPoolSize() const; - - /** - * return true if running activeFiber_ is not nullptr. - */ - bool hasActiveFiber() const; - - /** - * @return The currently running fiber or null if no fiber is executing. - */ - Fiber* currentFiber() const { - return currentFiber_; - } - - /** - * @return What was the most observed fiber stack usage (in bytes). - */ - size_t stackHighWatermark() const; - - /** - * Yield execution of the currently running fiber. Must only be called from a - * fiber executing on this FiberManager. The calling fiber will be scheduled - * when all other fibers have had a chance to run and the event loop is - * serviced. - */ - void yield(); - - /** - * Setup fibers execution observation/instrumentation. Fiber locals are - * available to observer. - * - * @param observer Fiber's execution observer. - */ - void setObserver(ExecutionObserver* observer); - - /** - * @return Current observer for this FiberManager. Returns nullptr - * if no observer has been set. - */ - ExecutionObserver* getObserver(); - - /** - * Setup fibers preempt runner. - */ - void setPreemptRunner(InlineFunctionRunner* preemptRunner); - - /** - * Returns an estimate of the number of fibers which are waiting to run (does - * not include fibers or tasks scheduled remotely). - */ - size_t runQueueSize() const { - return readyFibers_.size() + yieldedFibers_.size(); - } - - static FiberManager& getFiberManager(); - static FiberManager* getFiberManagerUnsafe(); - - private: - friend class Baton; - friend class Fiber; - template - struct AddTaskHelper; - template - struct AddTaskFinallyHelper; - - struct RemoteTask { - template - explicit RemoteTask(F&& f) - : func(std::forward(f)), rcontext(RequestContext::saveContext()) {} - template - RemoteTask(F&& f, const Fiber::LocalData& localData_) - : func(std::forward(f)), - localData(folly::make_unique(localData_)), - rcontext(RequestContext::saveContext()) {} - folly::Function func; - std::unique_ptr localData; - std::shared_ptr rcontext; - AtomicIntrusiveLinkedListHook nextRemoteTask; - }; - - intptr_t activateFiber(Fiber* fiber); - intptr_t deactivateFiber(Fiber* fiber); - - typedef folly::IntrusiveList FiberTailQueue; - typedef folly::IntrusiveList - GlobalFiberTailQueue; - - Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */ - /** - * Same as active fiber, but also set for functions run from fiber on main - * context. - */ - Fiber* currentFiber_{nullptr}; - - FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */ - FiberTailQueue yieldedFibers_; /**< queue of fibers which have yielded - execution */ - FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */ - - GlobalFiberTailQueue allFibers_; /**< list of all Fiber objects owned */ - - size_t fibersAllocated_{0}; /**< total number of fibers allocated */ - size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */ - size_t fibersActive_{0}; /**< number of running or blocked fibers */ - size_t fiberId_{0}; /**< id of last fiber used */ - - /** - * Maximum number of active fibers in the last period lasting - * Options::fibersPoolResizePeriod milliseconds. - */ - size_t maxFibersActiveLastPeriod_{0}; - - FContext::ContextStruct mainContext_; /**< stores loop function context */ - - std::unique_ptr loopController_; - bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */ - - /** - * When we are inside FiberManager loop this points to FiberManager. Otherwise - * it's nullptr - */ - static FOLLY_TLS FiberManager* currentFiberManager_; - - /** - * Allocator used to allocate stack for Fibers in the pool. - * Allocates stack on the stack of the main context. - */ - GuardPageAllocator stackAllocator_; - - const Options options_; /**< FiberManager options */ - - /** - * Largest observed individual Fiber stack usage in bytes. - */ - size_t stackHighWatermark_{0}; - - /** - * Schedules a loop with loopController (unless already scheduled before). - */ - void ensureLoopScheduled(); - - /** - * @return An initialized Fiber object from the pool - */ - Fiber* getFiber(); - - /** - * Sets local data for given fiber if all conditions are met. - */ - void initLocalData(Fiber& fiber); - - /** - * Function passed to the await call. - */ - folly::Function awaitFunc_; - - /** - * Function passed to the runInMainContext call. - */ - folly::Function immediateFunc_; - - /** - * Preempt runner. - */ - InlineFunctionRunner* preemptRunner_{nullptr}; - - /** - * Fiber's execution observer. - */ - ExecutionObserver* observer_{nullptr}; - - ExceptionCallback exceptionCallback_; /**< task exception callback */ - - folly::AtomicIntrusiveLinkedList - remoteReadyQueue_; - - folly::AtomicIntrusiveLinkedList - remoteTaskQueue_; - - std::shared_ptr timeoutManager_; - - struct FibersPoolResizer { - explicit FibersPoolResizer(FiberManager& fm) : fiberManager_(fm) {} - void operator()(); - - private: - FiberManager& fiberManager_; - }; - - FibersPoolResizer fibersPoolResizer_; - bool fibersPoolResizerScheduled_{false}; - - void doFibersPoolResizing(); - - /** - * Only local of this type will be available for fibers. - */ - std::type_index localType_; - - void runReadyFiber(Fiber* fiber); - void remoteReadyInsert(Fiber* fiber); - -#ifdef FOLLY_SANITIZE_ADDRESS - - // These methods notify ASAN when a fiber is entered/exited so that ASAN can - // find the right stack extents when it needs to poison/unpoison the stack. - - void registerFiberActivationWithAsan(Fiber* fiber); - void registerFiberDeactivationWithAsan(Fiber* fiber); - void unpoisonFiberStack(const Fiber* fiber); - -#endif // FOLLY_SANITIZE_ADDRESS - -#ifndef _WIN32 - bool alternateSignalStackRegistered_{false}; - - void registerAlternateSignalStack(); -#endif -}; - -/** - * @return true iff we are running in a fiber's context - */ -inline bool onFiber() { - auto fm = FiberManager::getFiberManagerUnsafe(); - return fm ? fm->hasActiveFiber() : false; -} - -/** - * Add a new task to be executed. - * - * @param func Task functor; must have a signature of `void func()`. - * The object will be destroyed once task execution is complete. - */ -template -inline void addTask(F&& func) { - return FiberManager::getFiberManager().addTask(std::forward(func)); -} - -/** - * Add a new task. When the task is complete, execute finally(Try&&) - * on the main context. - * Task functor is run and destroyed on the fiber context. - * Finally functor is run and destroyed on the main context. - * - * @param func Task functor; must have a signature of `T func()` for some T. - * @param finally Finally functor; must have a signature of - * `void finally(Try&&)` and will be passed - * the result of func() (including the exception if occurred). - */ -template -inline void addTaskFinally(F&& func, G&& finally) { - return FiberManager::getFiberManager().addTaskFinally( - std::forward(func), std::forward(finally)); -} - -/** - * Blocks task execution until given promise is fulfilled. - * - * Calls function passing in a Promise, which has to be fulfilled. - * - * @return data which was used to fulfill the promise. - */ -template -typename FirstArgOf::type::value_type inline await(F&& func); - -/** - * If called from a fiber, immediately switches to the FiberManager's context - * and runs func(), going back to the Fiber's context after completion. - * Outside a fiber, just calls func() directly. - * - * @return value returned by func(). - */ -template -typename std::result_of::type inline runInMainContext(F&& func) { - auto fm = FiberManager::getFiberManagerUnsafe(); - if (UNLIKELY(fm == nullptr)) { - return func(); - } - return fm->runInMainContext(std::forward(func)); -} - -/** - * Returns a refference to a fiber-local context for given Fiber. Should be - * always called with the same T for each fiber. Fiber-local context is lazily - * default-constructed on first request. - * When new task is scheduled via addTask / addTaskRemote from a fiber its - * fiber-local context is copied into the new fiber. - */ -template -T& local() { - auto fm = FiberManager::getFiberManagerUnsafe(); - if (fm) { - return fm->local(); - } - return FiberManager::localThread(); -} - -inline void yield() { - auto fm = FiberManager::getFiberManagerUnsafe(); - if (fm) { - fm->yield(); - } else { - std::this_thread::yield(); - } -} -} -} - +#include #include diff --git a/folly/fibers/FiberManagerFuture.h b/folly/fibers/FiberManagerFuture.h new file mode 100644 index 00000000..99c51b48 --- /dev/null +++ b/folly/fibers/FiberManagerFuture.h @@ -0,0 +1,54 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace folly { +namespace fibers { + +template +auto FiberManager::addTaskFuture(F&& func) -> folly::Future< + typename folly::Unit::Lift::type>::type> { + using T = typename std::result_of::type; + using FutureT = typename folly::Unit::Lift::type; + + folly::Promise p; + auto f = p.getFuture(); + addTaskFinally( + [func = std::forward(func)]() mutable { return func(); }, + [p = std::move(p)](folly::Try && t) mutable { + p.setTry(std::move(t)); + }); + return f; +} + +template +auto FiberManager::addTaskRemoteFuture(F&& func) -> folly::Future< + typename folly::Unit::Lift::type>::type> { + folly::Promise< + typename folly::Unit::Lift::type>::type> + p; + auto f = p.getFuture(); + addTaskRemote( + [ p = std::move(p), func = std::forward(func), this ]() mutable { + auto t = folly::makeTryWith(std::forward(func)); + runInMainContext([&]() { p.setTry(std::move(t)); }); + }); + return f; +} +} +} diff --git a/folly/fibers/FiberManagerInternal-inl.h b/folly/fibers/FiberManagerInternal-inl.h new file mode 100644 index 00000000..b0aa662d --- /dev/null +++ b/folly/fibers/FiberManagerInternal-inl.h @@ -0,0 +1,523 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include +#include +#include +#include +#include +#ifdef __APPLE__ +#include +#endif +#include +#include +#include +#include +#include + +namespace folly { +namespace fibers { + +namespace { + +inline FiberManager::Options preprocessOptions(FiberManager::Options opts) { +#ifdef FOLLY_SANITIZE_ADDRESS + /* ASAN needs a lot of extra stack space. + 16x is a conservative estimate, 8x also worked with tests + where it mattered. Note that overallocating here does not necessarily + increase RSS, since unused memory is pretty much free. */ + opts.stackSize *= 16; +#endif + return opts; +} + +} // anonymous + +inline void FiberManager::ensureLoopScheduled() { + if (isLoopScheduled_) { + return; + } + + isLoopScheduled_ = true; + loopController_->schedule(); +} + +inline intptr_t FiberManager::activateFiber(Fiber* fiber) { + DCHECK_EQ(activeFiber_, (Fiber*)nullptr); + +#ifdef FOLLY_SANITIZE_ADDRESS + registerFiberActivationWithAsan(fiber); +#endif + + activeFiber_ = fiber; + return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_); +} + +inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) { + DCHECK_EQ(activeFiber_, fiber); + +#ifdef FOLLY_SANITIZE_ADDRESS + registerFiberDeactivationWithAsan(fiber); +#endif + + activeFiber_ = nullptr; + return jumpContext(&fiber->fcontext_, &mainContext_, 0); +} + +inline void FiberManager::runReadyFiber(Fiber* fiber) { + SCOPE_EXIT { + assert(currentFiber_ == nullptr); + assert(activeFiber_ == nullptr); + }; + + assert( + fiber->state_ == Fiber::NOT_STARTED || + fiber->state_ == Fiber::READY_TO_RUN); + currentFiber_ = fiber; + fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_)); + if (observer_) { + observer_->starting(reinterpret_cast(fiber)); + } + + while (fiber->state_ == Fiber::NOT_STARTED || + fiber->state_ == Fiber::READY_TO_RUN) { + activateFiber(fiber); + if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) { + try { + immediateFunc_(); + } catch (...) { + exceptionCallback_(std::current_exception(), "running immediateFunc_"); + } + immediateFunc_ = nullptr; + fiber->state_ = Fiber::READY_TO_RUN; + } + } + + if (fiber->state_ == Fiber::AWAITING) { + awaitFunc_(*fiber); + awaitFunc_ = nullptr; + if (observer_) { + observer_->stopped(reinterpret_cast(fiber)); + } + currentFiber_ = nullptr; + fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_)); + } else if (fiber->state_ == Fiber::INVALID) { + assert(fibersActive_ > 0); + --fibersActive_; + // Making sure that task functor is deleted once task is complete. + // NOTE: we must do it on main context, as the fiber is not + // running at this point. + fiber->func_ = nullptr; + fiber->resultFunc_ = nullptr; + if (fiber->finallyFunc_) { + try { + fiber->finallyFunc_(); + } catch (...) { + exceptionCallback_(std::current_exception(), "running finallyFunc_"); + } + fiber->finallyFunc_ = nullptr; + } + // Make sure LocalData is not accessible from its destructor + if (observer_) { + observer_->stopped(reinterpret_cast(fiber)); + } + currentFiber_ = nullptr; + fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_)); + fiber->localData_.reset(); + fiber->rcontext_.reset(); + + if (fibersPoolSize_ < options_.maxFibersPoolSize || + options_.fibersPoolResizePeriodMs > 0) { + fibersPool_.push_front(*fiber); + ++fibersPoolSize_; + } else { + delete fiber; + assert(fibersAllocated_ > 0); + --fibersAllocated_; + } + } else if (fiber->state_ == Fiber::YIELDED) { + if (observer_) { + observer_->stopped(reinterpret_cast(fiber)); + } + currentFiber_ = nullptr; + fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_)); + fiber->state_ = Fiber::READY_TO_RUN; + yieldedFibers_.push_back(*fiber); + } +} + +inline bool FiberManager::loopUntilNoReady() { +#ifndef _WIN32 + if (UNLIKELY(!alternateSignalStackRegistered_)) { + registerAlternateSignalStack(); + } +#endif + + // Support nested FiberManagers + auto originalFiberManager = this; + std::swap(currentFiberManager_, originalFiberManager); + + SCOPE_EXIT { + isLoopScheduled_ = false; + if (!readyFibers_.empty()) { + ensureLoopScheduled(); + } + std::swap(currentFiberManager_, originalFiberManager); + CHECK_EQ(this, originalFiberManager); + }; + + bool hadRemoteFiber = true; + while (hadRemoteFiber) { + hadRemoteFiber = false; + + while (!readyFibers_.empty()) { + auto& fiber = readyFibers_.front(); + readyFibers_.pop_front(); + runReadyFiber(&fiber); + } + + remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) { + runReadyFiber(fiber); + hadRemoteFiber = true; + }); + + remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) { + std::unique_ptr task(taskPtr); + auto fiber = getFiber(); + if (task->localData) { + fiber->localData_ = *task->localData; + } + fiber->rcontext_ = std::move(task->rcontext); + + fiber->setFunction(std::move(task->func)); + fiber->data_ = reinterpret_cast(fiber); + if (observer_) { + observer_->runnable(reinterpret_cast(fiber)); + } + runReadyFiber(fiber); + hadRemoteFiber = true; + }); + } + + if (observer_) { + for (auto& yielded : yieldedFibers_) { + observer_->runnable(reinterpret_cast(&yielded)); + } + } + readyFibers_.splice(readyFibers_.end(), yieldedFibers_); + + return fibersActive_ > 0; +} + +// We need this to be in a struct, not inlined in addTask, because clang crashes +// otherwise. +template +struct FiberManager::AddTaskHelper { + class Func; + + static constexpr bool allocateInBuffer = + sizeof(Func) <= Fiber::kUserBufferSize; + + class Func { + public: + Func(F&& func, FiberManager& fm) : func_(std::forward(func)), fm_(fm) {} + + void operator()() { + try { + func_(); + } catch (...) { + fm_.exceptionCallback_( + std::current_exception(), "running Func functor"); + } + if (allocateInBuffer) { + this->~Func(); + } else { + delete this; + } + } + + private: + F func_; + FiberManager& fm_; + }; +}; + +template +void FiberManager::addTask(F&& func) { + typedef AddTaskHelper Helper; + + auto fiber = getFiber(); + initLocalData(*fiber); + + if (Helper::allocateInBuffer) { + auto funcLoc = static_cast(fiber->getUserBuffer()); + new (funcLoc) typename Helper::Func(std::forward(func), *this); + + fiber->setFunction(std::ref(*funcLoc)); + } else { + auto funcLoc = new typename Helper::Func(std::forward(func), *this); + + fiber->setFunction(std::ref(*funcLoc)); + } + + fiber->data_ = reinterpret_cast(fiber); + readyFibers_.push_back(*fiber); + if (observer_) { + observer_->runnable(reinterpret_cast(fiber)); + } + + ensureLoopScheduled(); +} + +template +void FiberManager::addTaskRemote(F&& func) { + auto task = [&]() { + auto currentFm = getFiberManagerUnsafe(); + if (currentFm && currentFm->currentFiber_ && + currentFm->localType_ == localType_) { + return folly::make_unique( + std::forward(func), currentFm->currentFiber_->localData_); + } + return folly::make_unique(std::forward(func)); + }(); + auto insertHead = [&]() { + return remoteTaskQueue_.insertHead(task.release()); + }; + loopController_->scheduleThreadSafe(std::ref(insertHead)); +} + +template +struct IsRvalueRefTry { + static const bool value = false; +}; +template +struct IsRvalueRefTry&&> { + static const bool value = true; +}; + +// We need this to be in a struct, not inlined in addTaskFinally, because clang +// crashes otherwise. +template +struct FiberManager::AddTaskFinallyHelper { + class Func; + + typedef typename std::result_of::type Result; + + class Finally { + public: + Finally(G finally, FiberManager& fm) + : finally_(std::move(finally)), fm_(fm) {} + + void operator()() { + try { + finally_(std::move(*result_)); + } catch (...) { + fm_.exceptionCallback_( + std::current_exception(), "running Finally functor"); + } + + if (allocateInBuffer) { + this->~Finally(); + } else { + delete this; + } + } + + private: + friend class Func; + + G finally_; + folly::Optional> result_; + FiberManager& fm_; + }; + + class Func { + public: + Func(F func, Finally& finally) + : func_(std::move(func)), result_(finally.result_) {} + + void operator()() { + result_ = folly::makeTryWith(std::move(func_)); + + if (allocateInBuffer) { + this->~Func(); + } else { + delete this; + } + } + + private: + F func_; + folly::Optional>& result_; + }; + + static constexpr bool allocateInBuffer = + sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize; +}; + +template +void FiberManager::addTaskFinally(F&& func, G&& finally) { + typedef typename std::result_of::type Result; + + static_assert( + IsRvalueRefTry::type>::value, + "finally(arg): arg must be Try&&"); + static_assert( + std::is_convertible< + Result, + typename std::remove_reference< + typename FirstArgOf::type>::type::element_type>::value, + "finally(Try&&): T must be convertible from func()'s return type"); + + auto fiber = getFiber(); + initLocalData(*fiber); + + typedef AddTaskFinallyHelper< + typename std::decay::type, + typename std::decay::type> + Helper; + + if (Helper::allocateInBuffer) { + auto funcLoc = static_cast(fiber->getUserBuffer()); + auto finallyLoc = + static_cast(static_cast(funcLoc + 1)); + + new (finallyLoc) typename Helper::Finally(std::forward(finally), *this); + new (funcLoc) typename Helper::Func(std::forward(func), *finallyLoc); + + fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc)); + } else { + auto finallyLoc = + new typename Helper::Finally(std::forward(finally), *this); + auto funcLoc = + new typename Helper::Func(std::forward(func), *finallyLoc); + + fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc)); + } + + fiber->data_ = reinterpret_cast(fiber); + readyFibers_.push_back(*fiber); + if (observer_) { + observer_->runnable(reinterpret_cast(fiber)); + } + + ensureLoopScheduled(); +} + +template +typename std::result_of::type FiberManager::runInMainContext(F&& func) { + if (UNLIKELY(activeFiber_ == nullptr)) { + return func(); + } + + typedef typename std::result_of::type Result; + + folly::Try result; + auto f = [&func, &result]() mutable { + result = folly::makeTryWith(std::forward(func)); + }; + + immediateFunc_ = std::ref(f); + activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE); + + return std::move(result).value(); +} + +inline FiberManager& FiberManager::getFiberManager() { + assert(currentFiberManager_ != nullptr); + return *currentFiberManager_; +} + +inline FiberManager* FiberManager::getFiberManagerUnsafe() { + return currentFiberManager_; +} + +inline bool FiberManager::hasActiveFiber() const { + return activeFiber_ != nullptr; +} + +inline void FiberManager::yield() { + assert(currentFiberManager_ == this); + assert(activeFiber_ != nullptr); + assert(activeFiber_->state_ == Fiber::RUNNING); + activeFiber_->preempt(Fiber::YIELDED); +} + +template +T& FiberManager::local() { + if (std::type_index(typeid(T)) == localType_ && currentFiber_) { + return currentFiber_->localData_.get(); + } + return localThread(); +} + +template +T& FiberManager::localThread() { +#ifndef __APPLE__ + static thread_local T t; + return t; +#else // osx doesn't support thread_local + static ThreadLocal t; + return *t; +#endif +} + +inline void FiberManager::initLocalData(Fiber& fiber) { + auto fm = getFiberManagerUnsafe(); + if (fm && fm->currentFiber_ && fm->localType_ == localType_) { + fiber.localData_ = fm->currentFiber_->localData_; + } + fiber.rcontext_ = RequestContext::saveContext(); +} + +template +FiberManager::FiberManager( + LocalType, + std::unique_ptr loopController__, + Options options) + : loopController_(std::move(loopController__)), + stackAllocator_(options.useGuardPages), + options_(preprocessOptions(std::move(options))), + exceptionCallback_([](std::exception_ptr eptr, std::string context) { + try { + std::rethrow_exception(eptr); + } catch (const std::exception& e) { + LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '" + << e.what() << "' was thrown in " + << "FiberManager with context '" << context << "'"; + } catch (...) { + LOG(DFATAL) << "Unknown exception was thrown in FiberManager with " + << "context '" << context << "'"; + } + }), + timeoutManager_(std::make_shared(*loopController_)), + fibersPoolResizer_(*this), + localType_(typeid(LocalT)) { + loopController_->setFiberManager(this); +} + +template +typename FirstArgOf::type::value_type inline await(F&& func) { + typedef typename FirstArgOf::type::value_type Result; + typedef typename FirstArgOf::type::baton_type BatonT; + + return Promise::await(std::forward(func)); +} +} +} diff --git a/folly/fibers/FiberManagerInternal.h b/folly/fibers/FiberManagerInternal.h new file mode 100644 index 00000000..ecea97a9 --- /dev/null +++ b/folly/fibers/FiberManagerInternal.h @@ -0,0 +1,573 @@ +/* + * Copyright 2016 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace folly { + +template +class Future; + +namespace fibers { + +class Baton; +class Fiber; +class LoopController; +class TimeoutController; + +template +class LocalType {}; + +class InlineFunctionRunner { + public: + virtual ~InlineFunctionRunner() {} + + /** + * func must be executed inline and only once. + */ + virtual void run(folly::Function func) = 0; +}; + +/** + * @class FiberManager + * @brief Single-threaded task execution engine. + * + * FiberManager allows semi-parallel task execution on the same thread. Each + * task can notify FiberManager that it is blocked on something (via await()) + * call. This will pause execution of this task and it will be resumed only + * when it is unblocked (via setData()). + */ +class FiberManager : public ::folly::Executor { + public: + struct Options { + static constexpr size_t kDefaultStackSize{16 * 1024}; + + /** + * Maximum stack size for fibers which will be used for executing all the + * tasks. + */ + size_t stackSize{kDefaultStackSize}; + + /** + * Record exact amount of stack used. + * + * This is fairly expensive: we fill each newly allocated stack + * with some known value and find the boundary of unused stack + * with linear search every time we surrender the stack back to fibersPool. + * 0 disables stack recording. + */ + size_t recordStackEvery{0}; + + /** + * Keep at most this many free fibers in the pool. + * This way the total number of fibers in the system is always bounded + * by the number of active fibers + maxFibersPoolSize. + */ + size_t maxFibersPoolSize{1000}; + + /** + * Protect limited amount of fiber stacks with guard pages. + */ + bool useGuardPages{true}; + + /** + * Free unnecessary fibers in the fibers pool every fibersPoolResizePeriodMs + * milliseconds. If value is 0, periodic resizing of the fibers pool is + * disabled. + */ + uint32_t fibersPoolResizePeriodMs{0}; + + constexpr Options() {} + }; + + using ExceptionCallback = + folly::Function; + + FiberManager(const FiberManager&) = delete; + FiberManager& operator=(const FiberManager&) = delete; + + /** + * Initializes, but doesn't start FiberManager loop + * + * @param loopController + * @param options FiberManager options + */ + explicit FiberManager( + std::unique_ptr loopController, + Options options = Options()); + + /** + * Initializes, but doesn't start FiberManager loop + * + * @param loopController + * @param options FiberManager options + * @tparam LocalT only local of this type may be stored on fibers. + * Locals of other types will be considered thread-locals. + */ + template + FiberManager( + LocalType, + std::unique_ptr loopController, + Options options = Options()); + + ~FiberManager(); + + /** + * Controller access. + */ + LoopController& loopController(); + const LoopController& loopController() const; + + /** + * Keeps running ready tasks until the list of ready tasks is empty. + * + * @return True if there are any waiting tasks remaining. + */ + bool loopUntilNoReady(); + + /** + * @return true if there are outstanding tasks. + */ + bool hasTasks() const; + + /** + * Sets exception callback which will be called if any of the tasks throws an + * exception. + * + * @param ec + */ + void setExceptionCallback(ExceptionCallback ec); + + /** + * Add a new task to be executed. Must be called from FiberManager's thread. + * + * @param func Task functor; must have a signature of `void func()`. + * The object will be destroyed once task execution is complete. + */ + template + void addTask(F&& func); + + /** + * Add a new task to be executed and return a future that will be set on + * return from func. Must be called from FiberManager's thread. + * + * @param func Task functor; must have a signature of `void func()`. + * The object will be destroyed once task execution is complete. + */ + template + auto addTaskFuture(F&& func) -> folly::Future< + typename folly::Unit::Lift::type>::type>; + /** + * Add a new task to be executed. Safe to call from other threads. + * + * @param func Task function; must have a signature of `void func()`. + * The object will be destroyed once task execution is complete. + */ + template + void addTaskRemote(F&& func); + + /** + * Add a new task to be executed and return a future that will be set on + * return from func. Safe to call from other threads. + * + * @param func Task function; must have a signature of `void func()`. + * The object will be destroyed once task execution is complete. + */ + template + auto addTaskRemoteFuture(F&& func) -> folly::Future< + typename folly::Unit::Lift::type>::type>; + + // Executor interface calls addTaskRemote + void add(folly::Func f) override { + addTaskRemote(std::move(f)); + } + + /** + * Add a new task. When the task is complete, execute finally(Try&&) + * on the main context. + * + * @param func Task functor; must have a signature of `T func()` for some T. + * @param finally Finally functor; must have a signature of + * `void finally(Try&&)` and will be passed + * the result of func() (including the exception if occurred). + */ + template + void addTaskFinally(F&& func, G&& finally); + + /** + * If called from a fiber, immediately switches to the FiberManager's context + * and runs func(), going back to the Fiber's context after completion. + * Outside a fiber, just calls func() directly. + * + * @return value returned by func(). + */ + template + typename std::result_of::type runInMainContext(F&& func); + + /** + * Returns a refference to a fiber-local context for given Fiber. Should be + * always called with the same T for each fiber. Fiber-local context is lazily + * default-constructed on first request. + * When new task is scheduled via addTask / addTaskRemote from a fiber its + * fiber-local context is copied into the new fiber. + */ + template + T& local(); + + template + static T& localThread(); + + /** + * @return How many fiber objects (and stacks) has this manager allocated. + */ + size_t fibersAllocated() const; + + /** + * @return How many of the allocated fiber objects are currently + * in the free pool. + */ + size_t fibersPoolSize() const; + + /** + * return true if running activeFiber_ is not nullptr. + */ + bool hasActiveFiber() const; + + /** + * @return The currently running fiber or null if no fiber is executing. + */ + Fiber* currentFiber() const { + return currentFiber_; + } + + /** + * @return What was the most observed fiber stack usage (in bytes). + */ + size_t stackHighWatermark() const; + + /** + * Yield execution of the currently running fiber. Must only be called from a + * fiber executing on this FiberManager. The calling fiber will be scheduled + * when all other fibers have had a chance to run and the event loop is + * serviced. + */ + void yield(); + + /** + * Setup fibers execution observation/instrumentation. Fiber locals are + * available to observer. + * + * @param observer Fiber's execution observer. + */ + void setObserver(ExecutionObserver* observer); + + /** + * @return Current observer for this FiberManager. Returns nullptr + * if no observer has been set. + */ + ExecutionObserver* getObserver(); + + /** + * Setup fibers preempt runner. + */ + void setPreemptRunner(InlineFunctionRunner* preemptRunner); + + /** + * Returns an estimate of the number of fibers which are waiting to run (does + * not include fibers or tasks scheduled remotely). + */ + size_t runQueueSize() const { + return readyFibers_.size() + yieldedFibers_.size(); + } + + static FiberManager& getFiberManager(); + static FiberManager* getFiberManagerUnsafe(); + + private: + friend class Baton; + friend class Fiber; + template + struct AddTaskHelper; + template + struct AddTaskFinallyHelper; + + struct RemoteTask { + template + explicit RemoteTask(F&& f) + : func(std::forward(f)), rcontext(RequestContext::saveContext()) {} + template + RemoteTask(F&& f, const Fiber::LocalData& localData_) + : func(std::forward(f)), + localData(folly::make_unique(localData_)), + rcontext(RequestContext::saveContext()) {} + folly::Function func; + std::unique_ptr localData; + std::shared_ptr rcontext; + AtomicIntrusiveLinkedListHook nextRemoteTask; + }; + + intptr_t activateFiber(Fiber* fiber); + intptr_t deactivateFiber(Fiber* fiber); + + typedef folly::IntrusiveList FiberTailQueue; + typedef folly::IntrusiveList + GlobalFiberTailQueue; + + Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */ + /** + * Same as active fiber, but also set for functions run from fiber on main + * context. + */ + Fiber* currentFiber_{nullptr}; + + FiberTailQueue readyFibers_; /**< queue of fibers ready to be executed */ + FiberTailQueue yieldedFibers_; /**< queue of fibers which have yielded + execution */ + FiberTailQueue fibersPool_; /**< pool of unitialized Fiber objects */ + + GlobalFiberTailQueue allFibers_; /**< list of all Fiber objects owned */ + + size_t fibersAllocated_{0}; /**< total number of fibers allocated */ + size_t fibersPoolSize_{0}; /**< total number of fibers in the free pool */ + size_t fibersActive_{0}; /**< number of running or blocked fibers */ + size_t fiberId_{0}; /**< id of last fiber used */ + + /** + * Maximum number of active fibers in the last period lasting + * Options::fibersPoolResizePeriod milliseconds. + */ + size_t maxFibersActiveLastPeriod_{0}; + + FContext::ContextStruct mainContext_; /**< stores loop function context */ + + std::unique_ptr loopController_; + bool isLoopScheduled_{false}; /**< was the ready loop scheduled to run? */ + + /** + * When we are inside FiberManager loop this points to FiberManager. Otherwise + * it's nullptr + */ + static FOLLY_TLS FiberManager* currentFiberManager_; + + /** + * Allocator used to allocate stack for Fibers in the pool. + * Allocates stack on the stack of the main context. + */ + GuardPageAllocator stackAllocator_; + + const Options options_; /**< FiberManager options */ + + /** + * Largest observed individual Fiber stack usage in bytes. + */ + size_t stackHighWatermark_{0}; + + /** + * Schedules a loop with loopController (unless already scheduled before). + */ + void ensureLoopScheduled(); + + /** + * @return An initialized Fiber object from the pool + */ + Fiber* getFiber(); + + /** + * Sets local data for given fiber if all conditions are met. + */ + void initLocalData(Fiber& fiber); + + /** + * Function passed to the await call. + */ + folly::Function awaitFunc_; + + /** + * Function passed to the runInMainContext call. + */ + folly::Function immediateFunc_; + + /** + * Preempt runner. + */ + InlineFunctionRunner* preemptRunner_{nullptr}; + + /** + * Fiber's execution observer. + */ + ExecutionObserver* observer_{nullptr}; + + ExceptionCallback exceptionCallback_; /**< task exception callback */ + + folly::AtomicIntrusiveLinkedList + remoteReadyQueue_; + + folly::AtomicIntrusiveLinkedList + remoteTaskQueue_; + + std::shared_ptr timeoutManager_; + + struct FibersPoolResizer { + explicit FibersPoolResizer(FiberManager& fm) : fiberManager_(fm) {} + void operator()(); + + private: + FiberManager& fiberManager_; + }; + + FibersPoolResizer fibersPoolResizer_; + bool fibersPoolResizerScheduled_{false}; + + void doFibersPoolResizing(); + + /** + * Only local of this type will be available for fibers. + */ + std::type_index localType_; + + void runReadyFiber(Fiber* fiber); + void remoteReadyInsert(Fiber* fiber); + +#ifdef FOLLY_SANITIZE_ADDRESS + + // These methods notify ASAN when a fiber is entered/exited so that ASAN can + // find the right stack extents when it needs to poison/unpoison the stack. + + void registerFiberActivationWithAsan(Fiber* fiber); + void registerFiberDeactivationWithAsan(Fiber* fiber); + void unpoisonFiberStack(const Fiber* fiber); + +#endif // FOLLY_SANITIZE_ADDRESS + +#ifndef _WIN32 + bool alternateSignalStackRegistered_{false}; + + void registerAlternateSignalStack(); +#endif +}; + +/** + * @return true iff we are running in a fiber's context + */ +inline bool onFiber() { + auto fm = FiberManager::getFiberManagerUnsafe(); + return fm ? fm->hasActiveFiber() : false; +} + +/** + * Add a new task to be executed. + * + * @param func Task functor; must have a signature of `void func()`. + * The object will be destroyed once task execution is complete. + */ +template +inline void addTask(F&& func) { + return FiberManager::getFiberManager().addTask(std::forward(func)); +} + +/** + * Add a new task. When the task is complete, execute finally(Try&&) + * on the main context. + * Task functor is run and destroyed on the fiber context. + * Finally functor is run and destroyed on the main context. + * + * @param func Task functor; must have a signature of `T func()` for some T. + * @param finally Finally functor; must have a signature of + * `void finally(Try&&)` and will be passed + * the result of func() (including the exception if occurred). + */ +template +inline void addTaskFinally(F&& func, G&& finally) { + return FiberManager::getFiberManager().addTaskFinally( + std::forward(func), std::forward(finally)); +} + +/** + * Blocks task execution until given promise is fulfilled. + * + * Calls function passing in a Promise, which has to be fulfilled. + * + * @return data which was used to fulfill the promise. + */ +template +typename FirstArgOf::type::value_type inline await(F&& func); + +/** + * If called from a fiber, immediately switches to the FiberManager's context + * and runs func(), going back to the Fiber's context after completion. + * Outside a fiber, just calls func() directly. + * + * @return value returned by func(). + */ +template +typename std::result_of::type inline runInMainContext(F&& func) { + auto fm = FiberManager::getFiberManagerUnsafe(); + if (UNLIKELY(fm == nullptr)) { + return func(); + } + return fm->runInMainContext(std::forward(func)); +} + +/** + * Returns a refference to a fiber-local context for given Fiber. Should be + * always called with the same T for each fiber. Fiber-local context is lazily + * default-constructed on first request. + * When new task is scheduled via addTask / addTaskRemote from a fiber its + * fiber-local context is copied into the new fiber. + */ +template +T& local() { + auto fm = FiberManager::getFiberManagerUnsafe(); + if (fm) { + return fm->local(); + } + return FiberManager::localThread(); +} + +inline void yield() { + auto fm = FiberManager::getFiberManagerUnsafe(); + if (fm) { + fm->yield(); + } else { + std::this_thread::yield(); + } +} +} +} + +#include diff --git a/folly/fibers/FiberManagerMap.h b/folly/fibers/FiberManagerMap.h index b210b735..d76bcd6b 100644 --- a/folly/fibers/FiberManagerMap.h +++ b/folly/fibers/FiberManagerMap.h @@ -16,7 +16,7 @@ #pragma once #include -#include +#include namespace folly { namespace fibers { diff --git a/folly/fibers/ForEach-inl.h b/folly/fibers/ForEach-inl.h index 4c6df91a..4144b9b4 100644 --- a/folly/fibers/ForEach-inl.h +++ b/folly/fibers/ForEach-inl.h @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include +#include namespace folly { namespace fibers { diff --git a/folly/fibers/WhenN-inl.h b/folly/fibers/WhenN-inl.h index 4c49da8c..fef78e86 100644 --- a/folly/fibers/WhenN-inl.h +++ b/folly/fibers/WhenN-inl.h @@ -15,7 +15,7 @@ */ #include -#include +#include #include namespace folly { -- 2.34.1