6cdb5f84d6b01571344bbea35f463b7ca9607092
[folly.git] / folly / fibers / FiberManagerInternal-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/fibers/Baton.h>
29 #include <folly/fibers/Fiber.h>
30 #include <folly/fibers/LoopController.h>
31 #include <folly/fibers/Promise.h>
32 #include <folly/Try.h>
33
34 namespace folly {
35 namespace fibers {
36
37 namespace {
38
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #ifdef FOLLY_SANITIZE_ADDRESS
41   /* ASAN needs a lot of extra stack space.
42      16x is a conservative estimate, 8x also worked with tests
43      where it mattered.  Note that overallocating here does not necessarily
44      increase RSS, since unused memory is pretty much free. */
45   opts.stackSize *= 16;
46 #endif
47   return opts;
48 }
49
50 } // anonymous
51
52 inline void FiberManager::ensureLoopScheduled() {
53   if (isLoopScheduled_) {
54     return;
55   }
56
57   isLoopScheduled_ = true;
58   loopController_->schedule();
59 }
60
61 inline void FiberManager::activateFiber(Fiber* fiber) {
62   DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
63
64 #ifdef FOLLY_SANITIZE_ADDRESS
65   DCHECK(!fiber->asanMainStackBase_);
66   DCHECK(!fiber->asanMainStackSize_);
67   auto stack = fiber->getStack();
68   void* asanFakeStack;
69   registerStartSwitchStackWithAsan(&asanFakeStack, stack.first, stack.second);
70   SCOPE_EXIT {
71     registerFinishSwitchStackWithAsan(asanFakeStack, nullptr, nullptr);
72     fiber->asanMainStackBase_ = nullptr;
73     fiber->asanMainStackSize_ = 0;
74   };
75 #endif
76
77   activeFiber_ = fiber;
78   jumpContext(
79       &mainContext_, &fiber->fcontext_, reinterpret_cast<intptr_t>(fiber));
80 }
81
82 inline void FiberManager::deactivateFiber(Fiber* fiber) {
83   DCHECK_EQ(activeFiber_, fiber);
84
85 #ifdef FOLLY_SANITIZE_ADDRESS
86   DCHECK(fiber->asanMainStackBase_);
87   DCHECK(fiber->asanMainStackSize_);
88
89   // Release fake stack if fiber is completed
90   auto saveFakeStackPtr =
91       fiber->state_ == Fiber::INVALID ? nullptr : &fiber->asanFakeStack_;
92   registerStartSwitchStackWithAsan(
93       saveFakeStackPtr, fiber->asanMainStackBase_, fiber->asanMainStackSize_);
94   SCOPE_EXIT {
95     registerFinishSwitchStackWithAsan(
96         fiber->asanFakeStack_,
97         &fiber->asanMainStackBase_,
98         &fiber->asanMainStackSize_);
99     fiber->asanFakeStack_ = nullptr;
100   };
101 #endif
102
103   activeFiber_ = nullptr;
104   auto context = jumpContext(&fiber->fcontext_, &mainContext_, 0);
105   DCHECK_EQ(fiber, reinterpret_cast<Fiber*>(context));
106 }
107
108 inline void FiberManager::runReadyFiber(Fiber* fiber) {
109   SCOPE_EXIT {
110     assert(currentFiber_ == nullptr);
111     assert(activeFiber_ == nullptr);
112   };
113
114   assert(
115       fiber->state_ == Fiber::NOT_STARTED ||
116       fiber->state_ == Fiber::READY_TO_RUN);
117   currentFiber_ = fiber;
118   fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
119   if (observer_) {
120     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
121   }
122
123   while (fiber->state_ == Fiber::NOT_STARTED ||
124          fiber->state_ == Fiber::READY_TO_RUN) {
125     activateFiber(fiber);
126     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
127       try {
128         immediateFunc_();
129       } catch (...) {
130         exceptionCallback_(std::current_exception(), "running immediateFunc_");
131       }
132       immediateFunc_ = nullptr;
133       fiber->state_ = Fiber::READY_TO_RUN;
134     }
135   }
136
137   if (fiber->state_ == Fiber::AWAITING) {
138     awaitFunc_(*fiber);
139     awaitFunc_ = nullptr;
140     if (observer_) {
141       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
142     }
143     currentFiber_ = nullptr;
144     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
145   } else if (fiber->state_ == Fiber::INVALID) {
146     assert(fibersActive_ > 0);
147     --fibersActive_;
148     // Making sure that task functor is deleted once task is complete.
149     // NOTE: we must do it on main context, as the fiber is not
150     // running at this point.
151     fiber->func_ = nullptr;
152     fiber->resultFunc_ = nullptr;
153     if (fiber->finallyFunc_) {
154       try {
155         fiber->finallyFunc_();
156       } catch (...) {
157         exceptionCallback_(std::current_exception(), "running finallyFunc_");
158       }
159       fiber->finallyFunc_ = nullptr;
160     }
161     // Make sure LocalData is not accessible from its destructor
162     if (observer_) {
163       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
164     }
165     currentFiber_ = nullptr;
166     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
167     fiber->localData_.reset();
168     fiber->rcontext_.reset();
169
170     if (fibersPoolSize_ < options_.maxFibersPoolSize ||
171         options_.fibersPoolResizePeriodMs > 0) {
172       fibersPool_.push_front(*fiber);
173       ++fibersPoolSize_;
174     } else {
175       delete fiber;
176       assert(fibersAllocated_ > 0);
177       --fibersAllocated_;
178     }
179   } else if (fiber->state_ == Fiber::YIELDED) {
180     if (observer_) {
181       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
182     }
183     currentFiber_ = nullptr;
184     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
185     fiber->state_ = Fiber::READY_TO_RUN;
186     yieldedFibers_.push_back(*fiber);
187   }
188 }
189
190 inline bool FiberManager::loopUntilNoReady() {
191 #ifndef _WIN32
192   if (UNLIKELY(!alternateSignalStackRegistered_)) {
193     registerAlternateSignalStack();
194   }
195 #endif
196
197   // Support nested FiberManagers
198   auto originalFiberManager = this;
199   std::swap(currentFiberManager_, originalFiberManager);
200
201   SCOPE_EXIT {
202     isLoopScheduled_ = false;
203     if (!readyFibers_.empty()) {
204       ensureLoopScheduled();
205     }
206     std::swap(currentFiberManager_, originalFiberManager);
207     CHECK_EQ(this, originalFiberManager);
208   };
209
210   bool hadRemoteFiber = true;
211   while (hadRemoteFiber) {
212     hadRemoteFiber = false;
213
214     while (!readyFibers_.empty()) {
215       auto& fiber = readyFibers_.front();
216       readyFibers_.pop_front();
217       runReadyFiber(&fiber);
218     }
219
220     remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
221       runReadyFiber(fiber);
222       hadRemoteFiber = true;
223     });
224
225     remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
226       std::unique_ptr<RemoteTask> task(taskPtr);
227       auto fiber = getFiber();
228       if (task->localData) {
229         fiber->localData_ = *task->localData;
230       }
231       fiber->rcontext_ = std::move(task->rcontext);
232
233       fiber->setFunction(std::move(task->func));
234       if (observer_) {
235         observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
236       }
237       runReadyFiber(fiber);
238       hadRemoteFiber = true;
239     });
240   }
241
242   if (observer_) {
243     for (auto& yielded : yieldedFibers_) {
244       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
245     }
246   }
247   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
248
249   return fibersActive_ > 0;
250 }
251
252 // We need this to be in a struct, not inlined in addTask, because clang crashes
253 // otherwise.
254 template <typename F>
255 struct FiberManager::AddTaskHelper {
256   class Func;
257
258   static constexpr bool allocateInBuffer =
259       sizeof(Func) <= Fiber::kUserBufferSize;
260
261   class Func {
262    public:
263     Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
264
265     void operator()() {
266       try {
267         func_();
268       } catch (...) {
269         fm_.exceptionCallback_(
270             std::current_exception(), "running Func functor");
271       }
272       if (allocateInBuffer) {
273         this->~Func();
274       } else {
275         delete this;
276       }
277     }
278
279    private:
280     F func_;
281     FiberManager& fm_;
282   };
283 };
284
285 template <typename F>
286 void FiberManager::addTask(F&& func) {
287   typedef AddTaskHelper<F> Helper;
288
289   auto fiber = getFiber();
290   initLocalData(*fiber);
291
292   if (Helper::allocateInBuffer) {
293     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
294     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
295
296     fiber->setFunction(std::ref(*funcLoc));
297   } else {
298     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
299
300     fiber->setFunction(std::ref(*funcLoc));
301   }
302
303   readyFibers_.push_back(*fiber);
304   if (observer_) {
305     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
306   }
307
308   ensureLoopScheduled();
309 }
310
311 template <typename F>
312 void FiberManager::addTaskRemote(F&& func) {
313   auto task = [&]() {
314     auto currentFm = getFiberManagerUnsafe();
315     if (currentFm && currentFm->currentFiber_ &&
316         currentFm->localType_ == localType_) {
317       return folly::make_unique<RemoteTask>(
318           std::forward<F>(func), currentFm->currentFiber_->localData_);
319     }
320     return folly::make_unique<RemoteTask>(std::forward<F>(func));
321   }();
322   auto insertHead = [&]() {
323     return remoteTaskQueue_.insertHead(task.release());
324   };
325   loopController_->scheduleThreadSafe(std::ref(insertHead));
326 }
327
328 template <typename X>
329 struct IsRvalueRefTry {
330   static const bool value = false;
331 };
332 template <typename T>
333 struct IsRvalueRefTry<folly::Try<T>&&> {
334   static const bool value = true;
335 };
336
337 // We need this to be in a struct, not inlined in addTaskFinally, because clang
338 // crashes otherwise.
339 template <typename F, typename G>
340 struct FiberManager::AddTaskFinallyHelper {
341   class Func;
342
343   typedef typename std::result_of<F()>::type Result;
344
345   class Finally {
346    public:
347     Finally(G finally, FiberManager& fm)
348         : finally_(std::move(finally)), fm_(fm) {}
349
350     void operator()() {
351       try {
352         finally_(std::move(*result_));
353       } catch (...) {
354         fm_.exceptionCallback_(
355             std::current_exception(), "running Finally functor");
356       }
357
358       if (allocateInBuffer) {
359         this->~Finally();
360       } else {
361         delete this;
362       }
363     }
364
365    private:
366     friend class Func;
367
368     G finally_;
369     folly::Optional<folly::Try<Result>> result_;
370     FiberManager& fm_;
371   };
372
373   class Func {
374    public:
375     Func(F func, Finally& finally)
376         : func_(std::move(func)), result_(finally.result_) {}
377
378     void operator()() {
379       result_ = folly::makeTryWith(std::move(func_));
380
381       if (allocateInBuffer) {
382         this->~Func();
383       } else {
384         delete this;
385       }
386     }
387
388    private:
389     F func_;
390     folly::Optional<folly::Try<Result>>& result_;
391   };
392
393   static constexpr bool allocateInBuffer =
394       sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
395 };
396
397 template <typename F, typename G>
398 void FiberManager::addTaskFinally(F&& func, G&& finally) {
399   typedef typename std::result_of<F()>::type Result;
400
401   static_assert(
402       IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
403       "finally(arg): arg must be Try<T>&&");
404   static_assert(
405       std::is_convertible<
406           Result,
407           typename std::remove_reference<
408               typename FirstArgOf<G>::type>::type::element_type>::value,
409       "finally(Try<T>&&): T must be convertible from func()'s return type");
410
411   auto fiber = getFiber();
412   initLocalData(*fiber);
413
414   typedef AddTaskFinallyHelper<
415       typename std::decay<F>::type,
416       typename std::decay<G>::type>
417       Helper;
418
419   if (Helper::allocateInBuffer) {
420     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
421     auto finallyLoc =
422         static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
423
424     new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
425     new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
426
427     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
428   } else {
429     auto finallyLoc =
430         new typename Helper::Finally(std::forward<G>(finally), *this);
431     auto funcLoc =
432         new typename Helper::Func(std::forward<F>(func), *finallyLoc);
433
434     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
435   }
436
437   readyFibers_.push_back(*fiber);
438   if (observer_) {
439     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
440   }
441
442   ensureLoopScheduled();
443 }
444
445 template <typename F>
446 typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
447   if (UNLIKELY(activeFiber_ == nullptr)) {
448     return func();
449   }
450
451   typedef typename std::result_of<F()>::type Result;
452
453   folly::Try<Result> result;
454   auto f = [&func, &result]() mutable {
455     result = folly::makeTryWith(std::forward<F>(func));
456   };
457
458   immediateFunc_ = std::ref(f);
459   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
460
461   return std::move(result).value();
462 }
463
464 inline FiberManager& FiberManager::getFiberManager() {
465   assert(currentFiberManager_ != nullptr);
466   return *currentFiberManager_;
467 }
468
469 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
470   return currentFiberManager_;
471 }
472
473 inline bool FiberManager::hasActiveFiber() const {
474   return activeFiber_ != nullptr;
475 }
476
477 inline void FiberManager::yield() {
478   assert(currentFiberManager_ == this);
479   assert(activeFiber_ != nullptr);
480   assert(activeFiber_->state_ == Fiber::RUNNING);
481   activeFiber_->preempt(Fiber::YIELDED);
482 }
483
484 template <typename T>
485 T& FiberManager::local() {
486   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
487     return currentFiber_->localData_.get<T>();
488   }
489   return localThread<T>();
490 }
491
492 template <typename T>
493 T& FiberManager::localThread() {
494 #ifndef __APPLE__
495   static thread_local T t;
496   return t;
497 #else // osx doesn't support thread_local
498   static ThreadLocal<T> t;
499   return *t;
500 #endif
501 }
502
503 inline void FiberManager::initLocalData(Fiber& fiber) {
504   auto fm = getFiberManagerUnsafe();
505   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
506     fiber.localData_ = fm->currentFiber_->localData_;
507   }
508   fiber.rcontext_ = RequestContext::saveContext();
509 }
510
511 template <typename LocalT>
512 FiberManager::FiberManager(
513     LocalType<LocalT>,
514     std::unique_ptr<LoopController> loopController__,
515     Options options)
516     : loopController_(std::move(loopController__)),
517       stackAllocator_(options.useGuardPages),
518       options_(preprocessOptions(std::move(options))),
519       exceptionCallback_([](std::exception_ptr eptr, std::string context) {
520         try {
521           std::rethrow_exception(eptr);
522         } catch (const std::exception& e) {
523           LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
524                       << e.what() << "' was thrown in "
525                       << "FiberManager with context '" << context << "'";
526         } catch (...) {
527           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
528                       << "context '" << context << "'";
529         }
530       }),
531       timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
532       fibersPoolResizer_(*this),
533       localType_(typeid(LocalT)) {
534   loopController_->setFiberManager(this);
535 }
536
537 template <typename F>
538 typename FirstArgOf<F>::type::value_type inline await(F&& func) {
539   typedef typename FirstArgOf<F>::type::value_type Result;
540   typedef typename FirstArgOf<F>::type::baton_type BatonT;
541
542   return Promise<Result, BatonT>::await(std::forward<F>(func));
543 }
544 }
545 }