ff40265e76af2d83f3a9b12e44437440de2482f3
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Promise.h>
33 #include <folly/futures/Try.h>
34
35 namespace folly { namespace fibers {
36
37 namespace {
38
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #ifdef FOLLY_SANITIZE_ADDRESS
41   /* ASAN needs a lot of extra stack space.
42      16x is a conservative estimate, 8x also worked with tests
43      where it mattered.  Note that overallocating here does not necessarily
44      increase RSS, since unused memory is pretty much free. */
45   opts.stackSize *= 16;
46 #endif
47   return opts;
48 }
49
50 }  // anonymous
51
52 inline void FiberManager::ensureLoopScheduled() {
53   if (isLoopScheduled_) {
54     return;
55   }
56
57   isLoopScheduled_ = true;
58   loopController_->schedule();
59 }
60
61 inline intptr_t FiberManager::activateFiber(Fiber* fiber) {
62   DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
63
64 #ifdef FOLLY_SANITIZE_ADDRESS
65   registerFiberActivationWithAsan(fiber);
66 #endif
67
68   activeFiber_ = fiber;
69   return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
70 }
71
72 inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) {
73   DCHECK_EQ(activeFiber_, fiber);
74
75 #ifdef FOLLY_SANITIZE_ADDRESS
76   registerFiberDeactivationWithAsan(fiber);
77 #endif
78
79   activeFiber_ = nullptr;
80   return jumpContext(&fiber->fcontext_, &mainContext_, 0);
81 }
82
83 inline void FiberManager::runReadyFiber(Fiber* fiber) {
84   SCOPE_EXIT {
85     assert(currentFiber_ == nullptr);
86     assert(activeFiber_ == nullptr);
87   };
88
89   assert(fiber->state_ == Fiber::NOT_STARTED ||
90          fiber->state_ == Fiber::READY_TO_RUN);
91   currentFiber_ = fiber;
92   fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
93   if (observer_) {
94     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
95   }
96
97   while (fiber->state_ == Fiber::NOT_STARTED ||
98          fiber->state_ == Fiber::READY_TO_RUN) {
99     activateFiber(fiber);
100     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
101       try {
102         immediateFunc_();
103       } catch (...) {
104         exceptionCallback_(std::current_exception(), "running immediateFunc_");
105       }
106       immediateFunc_ = nullptr;
107       fiber->state_ = Fiber::READY_TO_RUN;
108     }
109   }
110
111   if (fiber->state_ == Fiber::AWAITING) {
112     awaitFunc_(*fiber);
113     awaitFunc_ = nullptr;
114     if (observer_) {
115       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
116     }
117     currentFiber_ = nullptr;
118     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
119   } else if (fiber->state_ == Fiber::INVALID) {
120     assert(fibersActive_ > 0);
121     --fibersActive_;
122     // Making sure that task functor is deleted once task is complete.
123     // NOTE: we must do it on main context, as the fiber is not
124     // running at this point.
125     fiber->func_ = nullptr;
126     fiber->resultFunc_ = nullptr;
127     if (fiber->finallyFunc_) {
128       try {
129         fiber->finallyFunc_();
130       } catch (...) {
131         exceptionCallback_(std::current_exception(), "running finallyFunc_");
132       }
133       fiber->finallyFunc_ = nullptr;
134     }
135     // Make sure LocalData is not accessible from its destructor
136     if (observer_) {
137       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
138     }
139     currentFiber_ = nullptr;
140     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
141     fiber->localData_.reset();
142     fiber->rcontext_.reset();
143
144     if (fibersPoolSize_ < options_.maxFibersPoolSize ||
145         options_.fibersPoolResizePeriodMs > 0) {
146       fibersPool_.push_front(*fiber);
147       ++fibersPoolSize_;
148     } else {
149       delete fiber;
150       assert(fibersAllocated_ > 0);
151       --fibersAllocated_;
152     }
153   } else if (fiber->state_ == Fiber::YIELDED) {
154     if (observer_) {
155       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
156     }
157     currentFiber_ = nullptr;
158     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
159     fiber->state_ = Fiber::READY_TO_RUN;
160     yieldedFibers_.push_back(*fiber);
161   }
162 }
163
164 inline bool FiberManager::loopUntilNoReady() {
165   // Support nested FiberManagers
166   auto originalFiberManager = this;
167   std::swap(currentFiberManager_, originalFiberManager);
168
169   SCOPE_EXIT {
170     isLoopScheduled_ = false;
171     if (!readyFibers_.empty()) {
172       ensureLoopScheduled();
173     }
174     std::swap(currentFiberManager_, originalFiberManager);
175     CHECK_EQ(this, originalFiberManager);
176   };
177
178   bool hadRemoteFiber = true;
179   while (hadRemoteFiber) {
180     hadRemoteFiber = false;
181
182     while (!readyFibers_.empty()) {
183       auto& fiber = readyFibers_.front();
184       readyFibers_.pop_front();
185       runReadyFiber(&fiber);
186     }
187
188     remoteReadyQueue_.sweep(
189       [this, &hadRemoteFiber] (Fiber* fiber) {
190         runReadyFiber(fiber);
191         hadRemoteFiber = true;
192       }
193     );
194
195     remoteTaskQueue_.sweep(
196       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
197         std::unique_ptr<RemoteTask> task(taskPtr);
198         auto fiber = getFiber();
199         if (task->localData) {
200           fiber->localData_ = *task->localData;
201         }
202         fiber->rcontext_ = std::move(task->rcontext);
203
204         fiber->setFunction(std::move(task->func));
205         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
206         if (observer_) {
207           observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
208         }
209         runReadyFiber(fiber);
210         hadRemoteFiber = true;
211       }
212     );
213   }
214
215   if (observer_) {
216     for (auto& yielded : yieldedFibers_) {
217       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
218     }
219   }
220   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
221
222   return fibersActive_ > 0;
223 }
224
225 // We need this to be in a struct, not inlined in addTask, because clang crashes
226 // otherwise.
227 template <typename F>
228 struct FiberManager::AddTaskHelper {
229   class Func;
230
231   static constexpr bool allocateInBuffer =
232     sizeof(Func) <= Fiber::kUserBufferSize;
233
234   class Func {
235    public:
236     Func(F&& func, FiberManager& fm) :
237         func_(std::forward<F>(func)), fm_(fm) {}
238
239     void operator()() {
240       try {
241         func_();
242       } catch (...) {
243         fm_.exceptionCallback_(std::current_exception(),
244                                "running Func functor");
245       }
246       if (allocateInBuffer) {
247         this->~Func();
248       } else {
249         delete this;
250       }
251     }
252
253    private:
254     F func_;
255     FiberManager& fm_;
256   };
257 };
258
259 template <typename F>
260 void FiberManager::addTask(F&& func) {
261   typedef AddTaskHelper<F> Helper;
262
263   auto fiber = getFiber();
264   initLocalData(*fiber);
265
266   if (Helper::allocateInBuffer) {
267     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
268     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
269
270     fiber->setFunction(std::ref(*funcLoc));
271   } else {
272     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
273
274     fiber->setFunction(std::ref(*funcLoc));
275   }
276
277   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
278   readyFibers_.push_back(*fiber);
279   if (observer_) {
280     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
281   }
282
283   ensureLoopScheduled();
284 }
285
286 template <typename F>
287 auto FiberManager::addTaskFuture(F&& func)
288     -> folly::Future<typename std::result_of<F()>::type> {
289   using T = typename std::result_of<F()>::type;
290   folly::Promise<T> p;
291   auto f = p.getFuture();
292   addTaskFinally([func = std::forward<F>(func)]() mutable { return func(); },
293                  [p = std::move(p)](folly::Try<T> && t) mutable {
294                    p.setTry(std::move(t));
295                  });
296   return f;
297 }
298
299 template <typename F>
300 void FiberManager::addTaskRemote(F&& func) {
301   // addTaskRemote indirectly requires wrapping the function in a
302   // std::function, which must be copyable. As move-only lambdas may be
303   // passed in we wrap it first in a move wrapper and then capture the wrapped
304   // version.
305   auto functionWrapper = [f = folly::makeMoveWrapper(
306                               std::forward<F>(func))]() mutable {
307     return (*f)();
308   };
309   auto task = [&]() {
310     auto currentFm = getFiberManagerUnsafe();
311     if (currentFm &&
312         currentFm->currentFiber_ &&
313         currentFm->localType_ == localType_) {
314       return folly::make_unique<RemoteTask>(
315           std::move(functionWrapper), currentFm->currentFiber_->localData_);
316     }
317     return folly::make_unique<RemoteTask>(std::move(functionWrapper));
318   }();
319   auto insertHead =
320       [&]() { return remoteTaskQueue_.insertHead(task.release()); };
321   loopController_->scheduleThreadSafe(std::ref(insertHead));
322 }
323
324 template <typename F>
325 auto FiberManager::addTaskRemoteFuture(F&& func)
326     -> folly::Future<typename std::result_of<F()>::type> {
327   folly::Promise<typename std::result_of<F()>::type> p;
328   auto f = p.getFuture();
329   addTaskRemote(
330       [ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
331         auto t = folly::makeTryWith(std::forward<F>(func));
332         runInMainContext([&]() { p.setTry(std::move(t)); });
333       });
334   return f;
335 }
336
337 template <typename X>
338 struct IsRvalueRefTry { static const bool value = false; };
339 template <typename T>
340 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
341
342 // We need this to be in a struct, not inlined in addTaskFinally, because clang
343 // crashes otherwise.
344 template <typename F, typename G>
345 struct FiberManager::AddTaskFinallyHelper {
346   class Func;
347   class Finally;
348
349   typedef typename std::result_of<F()>::type Result;
350
351   static constexpr bool allocateInBuffer =
352     sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
353
354   class Finally {
355    public:
356     Finally(G&& finally,
357             FiberManager& fm) :
358         finally_(std::forward<G>(finally)),
359         fm_(fm) {
360     }
361
362     void operator()() {
363       try {
364         finally_(std::move(*result_));
365       } catch (...) {
366         fm_.exceptionCallback_(std::current_exception(),
367                                "running Finally functor");
368       }
369
370       if (allocateInBuffer) {
371         this->~Finally();
372       } else {
373         delete this;
374       }
375     }
376
377    private:
378     friend class Func;
379
380     G finally_;
381     folly::Optional<folly::Try<Result>> result_;
382     FiberManager& fm_;
383   };
384
385   class Func {
386    public:
387     Func(F&& func, Finally& finally) :
388         func_(std::move(func)), result_(finally.result_) {}
389
390     void operator()() {
391       result_ = folly::makeTryWith(std::move(func_));
392
393       if (allocateInBuffer) {
394         this->~Func();
395       } else {
396         delete this;
397       }
398     }
399
400    private:
401     F func_;
402     folly::Optional<folly::Try<Result>>& result_;
403   };
404 };
405
406 template <typename F, typename G>
407 void FiberManager::addTaskFinally(F&& func, G&& finally) {
408   typedef typename std::result_of<F()>::type Result;
409
410   static_assert(
411     IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
412     "finally(arg): arg must be Try<T>&&");
413   static_assert(
414     std::is_convertible<
415       Result,
416       typename std::remove_reference<
417         typename FirstArgOf<G>::type
418       >::type::element_type
419     >::value,
420     "finally(Try<T>&&): T must be convertible from func()'s return type");
421
422   auto fiber = getFiber();
423   initLocalData(*fiber);
424
425   typedef AddTaskFinallyHelper<F,G> Helper;
426
427   if (Helper::allocateInBuffer) {
428     auto funcLoc = static_cast<typename Helper::Func*>(
429       fiber->getUserBuffer());
430     auto finallyLoc = static_cast<typename Helper::Finally*>(
431       static_cast<void*>(funcLoc + 1));
432
433     new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
434     new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
435
436     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
437   } else {
438     auto finallyLoc =
439         new typename Helper::Finally(std::forward<G>(finally), *this);
440     auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
441
442     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
443   }
444
445   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
446   readyFibers_.push_back(*fiber);
447   if (observer_) {
448     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
449   }
450
451   ensureLoopScheduled();
452 }
453
454 template <typename F>
455 typename std::result_of<F()>::type
456 FiberManager::runInMainContext(F&& func) {
457   if (UNLIKELY(activeFiber_ == nullptr)) {
458     return func();
459   }
460
461   typedef typename std::result_of<F()>::type Result;
462
463   folly::Try<Result> result;
464   auto f = [&func, &result]() mutable {
465     result = folly::makeTryWith(std::forward<F>(func));
466   };
467
468   immediateFunc_ = std::ref(f);
469   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
470
471   return std::move(result).value();
472 }
473
474 inline FiberManager& FiberManager::getFiberManager() {
475   assert(currentFiberManager_ != nullptr);
476   return *currentFiberManager_;
477 }
478
479 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
480   return currentFiberManager_;
481 }
482
483 inline bool FiberManager::hasActiveFiber() const {
484   return activeFiber_ != nullptr;
485 }
486
487 inline void FiberManager::yield() {
488   assert(currentFiberManager_ == this);
489   assert(activeFiber_ != nullptr);
490   assert(activeFiber_->state_ == Fiber::RUNNING);
491   activeFiber_->preempt(Fiber::YIELDED);
492 }
493
494 template <typename T>
495 T& FiberManager::local() {
496   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
497     return currentFiber_->localData_.get<T>();
498   }
499   return localThread<T>();
500 }
501
502 template <typename T>
503 T& FiberManager::localThread() {
504 #ifndef __APPLE__
505   static thread_local T t;
506   return t;
507 #else // osx doesn't support thread_local
508   static ThreadLocal<T> t;
509   return *t;
510 #endif
511 }
512
513 inline void FiberManager::initLocalData(Fiber& fiber) {
514   auto fm = getFiberManagerUnsafe();
515   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
516     fiber.localData_ = fm->currentFiber_->localData_;
517   }
518   fiber.rcontext_ = RequestContext::saveContext();
519 }
520
521 template <typename LocalT>
522 FiberManager::FiberManager(
523   LocalType<LocalT>,
524   std::unique_ptr<LoopController> loopController__,
525   Options options)  :
526     loopController_(std::move(loopController__)),
527     stackAllocator_(options.useGuardPages),
528     options_(preprocessOptions(std::move(options))),
529     exceptionCallback_([](std::exception_ptr eptr, std::string context) {
530         try {
531           std::rethrow_exception(eptr);
532         } catch (const std::exception& e) {
533           LOG(DFATAL) << "Exception " << typeid(e).name()
534                       << " with message '" << e.what() << "' was thrown in "
535                       << "FiberManager with context '" << context << "'";
536           throw;
537         } catch (...) {
538           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
539                       << "context '" << context << "'";
540           throw;
541         }
542       }),
543     timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
544     fibersPoolResizer_(*this),
545     localType_(typeid(LocalT)) {
546   loopController_->setFiberManager(this);
547 }
548
549 template <typename F>
550 typename FirstArgOf<F>::type::value_type
551 inline await(F&& func) {
552   typedef typename FirstArgOf<F>::type::value_type Result;
553
554   folly::Try<Result> result;
555
556   Baton baton;
557   baton.wait([&func, &result, &baton]() mutable {
558       func(Promise<Result>(result, baton));
559     });
560
561   return folly::moveFromTry(result);
562 }
563
564 }}