2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/Optional.h>
22 #include <folly/Portability.h>
23 #include <folly/ScopeGuard.h>
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/experimental/fibers/Fiber.h>
26 #include <folly/experimental/fibers/Promise.h>
27 #include <folly/experimental/fibers/LoopController.h>
28 #include <folly/futures/Try.h>
30 namespace folly { namespace fibers {
32 inline void FiberManager::ensureLoopScheduled() {
33 if (isLoopScheduled_) {
37 isLoopScheduled_ = true;
38 loopController_->schedule();
41 inline void FiberManager::runReadyFiber(Fiber* fiber) {
43 assert(currentFiber_ == nullptr);
44 assert(activeFiber_ == nullptr);
47 assert(fiber->state_ == Fiber::NOT_STARTED ||
48 fiber->state_ == Fiber::READY_TO_RUN);
49 currentFiber_ = fiber;
51 observer_->starting();
54 while (fiber->state_ == Fiber::NOT_STARTED ||
55 fiber->state_ == Fiber::READY_TO_RUN) {
57 jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
58 if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
62 exceptionCallback_(std::current_exception(), "running immediateFunc_");
64 immediateFunc_ = nullptr;
65 fiber->state_ = Fiber::READY_TO_RUN;
69 if (fiber->state_ == Fiber::AWAITING) {
75 currentFiber_ = nullptr;
76 } else if (fiber->state_ == Fiber::INVALID) {
77 assert(fibersActive_ > 0);
79 // Making sure that task functor is deleted once task is complete.
80 // NOTE: we must do it on main context, as the fiber is not
81 // running at this point.
82 fiber->func_ = nullptr;
83 fiber->resultFunc_ = nullptr;
84 if (fiber->finallyFunc_) {
86 fiber->finallyFunc_();
88 exceptionCallback_(std::current_exception(), "running finallyFunc_");
90 fiber->finallyFunc_ = nullptr;
92 // Make sure LocalData is not accessible from its destructor
96 currentFiber_ = nullptr;
97 fiber->localData_.reset();
99 if (fibersPoolSize_ < options_.maxFibersPoolSize) {
100 fibersPool_.push_front(*fiber);
104 assert(fibersAllocated_ > 0);
107 } else if (fiber->state_ == Fiber::YIELDED) {
109 observer_->stopped();
111 currentFiber_ = nullptr;
112 fiber->state_ = Fiber::READY_TO_RUN;
113 yieldedFibers_.push_back(*fiber);
117 inline bool FiberManager::loopUntilNoReady() {
119 isLoopScheduled_ = false;
120 if (!readyFibers_.empty()) {
121 ensureLoopScheduled();
123 currentFiberManager_ = nullptr;
126 currentFiberManager_ = this;
128 bool hadRemoteFiber = true;
129 while (hadRemoteFiber) {
130 hadRemoteFiber = false;
132 while (!readyFibers_.empty()) {
133 auto& fiber = readyFibers_.front();
134 readyFibers_.pop_front();
135 runReadyFiber(&fiber);
138 remoteReadyQueue_.sweep(
139 [this, &hadRemoteFiber] (Fiber* fiber) {
140 runReadyFiber(fiber);
141 hadRemoteFiber = true;
145 remoteTaskQueue_.sweep(
146 [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
147 std::unique_ptr<RemoteTask> task(taskPtr);
148 auto fiber = getFiber();
149 if (task->localData) {
150 fiber->localData_ = *task->localData;
153 fiber->setFunction(std::move(task->func));
154 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
155 runReadyFiber(fiber);
156 hadRemoteFiber = true;
161 readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
163 return fibersActive_ > 0;
166 // We need this to be in a struct, not inlined in addTask, because clang crashes
168 template <typename F>
169 struct FiberManager::AddTaskHelper {
172 static constexpr bool allocateInBuffer =
173 sizeof(Func) <= Fiber::kUserBufferSize;
177 Func(F&& func, FiberManager& fm) :
178 func_(std::forward<F>(func)), fm_(fm) {}
184 fm_.exceptionCallback_(std::current_exception(),
185 "running Func functor");
187 if (allocateInBuffer) {
200 template <typename F>
201 void FiberManager::addTask(F&& func) {
202 typedef AddTaskHelper<F> Helper;
204 auto fiber = getFiber();
205 initLocalData(*fiber);
207 if (Helper::allocateInBuffer) {
208 auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
209 new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
211 fiber->setFunction(std::ref(*funcLoc));
213 auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
215 fiber->setFunction(std::ref(*funcLoc));
218 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
219 readyFibers_.push_back(*fiber);
221 ensureLoopScheduled();
224 template <typename F>
225 void FiberManager::addTaskRemote(F&& func) {
227 auto currentFm = getFiberManagerUnsafe();
229 currentFm->currentFiber_ &&
230 currentFm->localType_ == localType_) {
231 return folly::make_unique<RemoteTask>(
232 std::forward<F>(func),
233 currentFm->currentFiber_->localData_);
235 return folly::make_unique<RemoteTask>(std::forward<F>(func));
237 if (remoteTaskQueue_.insertHead(task.release())) {
238 loopController_->scheduleThreadSafe();
242 template <typename X>
243 struct IsRvalueRefTry { static const bool value = false; };
244 template <typename T>
245 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
247 // We need this to be in a struct, not inlined in addTaskFinally, because clang
248 // crashes otherwise.
249 template <typename F, typename G>
250 struct FiberManager::AddTaskFinallyHelper {
254 typedef typename std::result_of<F()>::type Result;
256 static constexpr bool allocateInBuffer =
257 sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
263 finally_(std::move(finally)),
269 finally_(std::move(*result_));
271 fm_.exceptionCallback_(std::current_exception(),
272 "running Finally functor");
275 if (allocateInBuffer) {
286 folly::Optional<folly::Try<Result>> result_;
292 Func(F&& func, Finally& finally) :
293 func_(std::move(func)), result_(finally.result_) {}
296 result_ = folly::makeTryFunction(std::move(func_));
298 if (allocateInBuffer) {
307 folly::Optional<folly::Try<Result>>& result_;
311 template <typename F, typename G>
312 void FiberManager::addTaskFinally(F&& func, G&& finally) {
313 typedef typename std::result_of<F()>::type Result;
316 IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
317 "finally(arg): arg must be Try<T>&&");
321 typename std::remove_reference<
322 typename FirstArgOf<G>::type
323 >::type::element_type
325 "finally(Try<T>&&): T must be convertible from func()'s return type");
327 auto fiber = getFiber();
328 initLocalData(*fiber);
330 typedef AddTaskFinallyHelper<F,G> Helper;
332 if (Helper::allocateInBuffer) {
333 auto funcLoc = static_cast<typename Helper::Func*>(
334 fiber->getUserBuffer());
335 auto finallyLoc = static_cast<typename Helper::Finally*>(
336 static_cast<void*>(funcLoc + 1));
338 new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
339 new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
341 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
343 auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
344 auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
346 fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
349 fiber->data_ = reinterpret_cast<intptr_t>(fiber);
350 readyFibers_.push_back(*fiber);
352 ensureLoopScheduled();
355 template <typename F>
356 typename std::result_of<F()>::type
357 FiberManager::runInMainContext(F&& func) {
358 return runInMainContextHelper(std::forward<F>(func));
361 template <typename F>
362 inline typename std::enable_if<
363 !std::is_same<typename std::result_of<F()>::type, void>::value,
364 typename std::result_of<F()>::type>::type
365 FiberManager::runInMainContextHelper(F&& func) {
366 if (UNLIKELY(activeFiber_ == nullptr)) {
370 typedef typename std::result_of<F()>::type Result;
372 folly::Try<Result> result;
373 auto f = [&func, &result]() mutable {
374 result = folly::makeTryFunction(std::forward<F>(func));
377 immediateFunc_ = std::ref(f);
378 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
380 return std::move(result.value());
383 template <typename F>
384 inline typename std::enable_if<
385 std::is_same<typename std::result_of<F()>::type, void>::value,
387 FiberManager::runInMainContextHelper(F&& func) {
388 if (UNLIKELY(activeFiber_ == nullptr)) {
393 immediateFunc_ = std::ref(func);
394 activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
397 inline FiberManager& FiberManager::getFiberManager() {
398 assert(currentFiberManager_ != nullptr);
399 return *currentFiberManager_;
402 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
403 return currentFiberManager_;
406 inline bool FiberManager::hasActiveFiber() const {
407 return activeFiber_ != nullptr;
410 inline void FiberManager::yield() {
411 assert(currentFiberManager_ == this);
412 assert(activeFiber_ != nullptr);
413 assert(activeFiber_->state_ == Fiber::RUNNING);
414 activeFiber_->preempt(Fiber::YIELDED);
417 template <typename T>
418 T& FiberManager::local() {
419 if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
420 return currentFiber_->localData_.get<T>();
422 return localThread<T>();
425 template <typename T>
426 T& FiberManager::localThread() {
427 static thread_local T t;
431 inline void FiberManager::initLocalData(Fiber& fiber) {
432 auto fm = getFiberManagerUnsafe();
433 if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
434 fiber.localData_ = fm->currentFiber_->localData_;
438 template <typename LocalT>
439 FiberManager::FiberManager(
441 std::unique_ptr<LoopController> loopController__,
443 loopController_(std::move(loopController__)),
444 options_(std::move(options)),
445 exceptionCallback_([](std::exception_ptr eptr, std::string context) {
447 std::rethrow_exception(eptr);
448 } catch (const std::exception& e) {
449 LOG(DFATAL) << "Exception " << typeid(e).name()
450 << " with message '" << e.what() << "' was thrown in "
451 << "FiberManager with context '" << context << "'";
454 LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
455 << "context '" << context << "'";
459 timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
460 localType_(typeid(LocalT)) {
461 loopController_->setFiberManager(this);
464 template <typename F>
465 typename FirstArgOf<F>::type::value_type
466 inline await(F&& func) {
467 typedef typename FirstArgOf<F>::type::value_type Result;
469 folly::Try<Result> result;
472 baton.wait([&func, &result, &baton]() mutable {
473 func(Promise<Result>(result, baton));
476 return folly::moveFromTry(std::move(result));