cd73eb176d650f3e64281b75177a739c2aa08774
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Try.h>
33
34 namespace folly { namespace fibers {
35
36 namespace {
37
38 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
39 #ifdef FOLLY_SANITIZE_ADDRESS
40   /* ASAN needs a lot of extra stack space.
41      16x is a conservative estimate, 8x also worked with tests
42      where it mattered.  Note that overallocating here does not necessarily
43      increase RSS, since unused memory is pretty much free. */
44   opts.stackSize *= 16;
45 #endif
46   return opts;
47 }
48
49 }  // anonymous
50
51 inline void FiberManager::ensureLoopScheduled() {
52   if (isLoopScheduled_) {
53     return;
54   }
55
56   isLoopScheduled_ = true;
57   loopController_->schedule();
58 }
59
60 inline void FiberManager::runReadyFiber(Fiber* fiber) {
61   SCOPE_EXIT {
62     assert(currentFiber_ == nullptr);
63     assert(activeFiber_ == nullptr);
64   };
65
66   assert(fiber->state_ == Fiber::NOT_STARTED ||
67          fiber->state_ == Fiber::READY_TO_RUN);
68   currentFiber_ = fiber;
69   fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
70   if (observer_) {
71     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
72   }
73
74   while (fiber->state_ == Fiber::NOT_STARTED ||
75          fiber->state_ == Fiber::READY_TO_RUN) {
76     activeFiber_ = fiber;
77     jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
78     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
79       try {
80         immediateFunc_();
81       } catch (...) {
82         exceptionCallback_(std::current_exception(), "running immediateFunc_");
83       }
84       immediateFunc_ = nullptr;
85       fiber->state_ = Fiber::READY_TO_RUN;
86     }
87   }
88
89   if (fiber->state_ == Fiber::AWAITING) {
90     awaitFunc_(*fiber);
91     awaitFunc_ = nullptr;
92     if (observer_) {
93       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
94     }
95     currentFiber_ = nullptr;
96     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
97   } else if (fiber->state_ == Fiber::INVALID) {
98     assert(fibersActive_ > 0);
99     --fibersActive_;
100     // Making sure that task functor is deleted once task is complete.
101     // NOTE: we must do it on main context, as the fiber is not
102     // running at this point.
103     fiber->func_ = nullptr;
104     fiber->resultFunc_ = nullptr;
105     if (fiber->finallyFunc_) {
106       try {
107         fiber->finallyFunc_();
108       } catch (...) {
109         exceptionCallback_(std::current_exception(), "running finallyFunc_");
110       }
111       fiber->finallyFunc_ = nullptr;
112     }
113     // Make sure LocalData is not accessible from its destructor
114     if (observer_) {
115       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
116     }
117     currentFiber_ = nullptr;
118     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
119     fiber->localData_.reset();
120     fiber->rcontext_.reset();
121
122     if (fibersPoolSize_ < options_.maxFibersPoolSize ||
123         options_.fibersPoolResizePeriodMs > 0) {
124       fibersPool_.push_front(*fiber);
125       ++fibersPoolSize_;
126     } else {
127       delete fiber;
128       assert(fibersAllocated_ > 0);
129       --fibersAllocated_;
130     }
131   } else if (fiber->state_ == Fiber::YIELDED) {
132     if (observer_) {
133       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
134     }
135     currentFiber_ = nullptr;
136     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
137     fiber->state_ = Fiber::READY_TO_RUN;
138     yieldedFibers_.push_back(*fiber);
139   }
140 }
141
142 inline bool FiberManager::loopUntilNoReady() {
143   SCOPE_EXIT {
144     isLoopScheduled_ = false;
145     if (!readyFibers_.empty()) {
146       ensureLoopScheduled();
147     }
148     currentFiberManager_ = nullptr;
149   };
150
151   currentFiberManager_ = this;
152
153   bool hadRemoteFiber = true;
154   while (hadRemoteFiber) {
155     hadRemoteFiber = false;
156
157     while (!readyFibers_.empty()) {
158       auto& fiber = readyFibers_.front();
159       readyFibers_.pop_front();
160       runReadyFiber(&fiber);
161     }
162
163     remoteReadyQueue_.sweep(
164       [this, &hadRemoteFiber] (Fiber* fiber) {
165         runReadyFiber(fiber);
166         hadRemoteFiber = true;
167       }
168     );
169
170     remoteTaskQueue_.sweep(
171       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
172         std::unique_ptr<RemoteTask> task(taskPtr);
173         auto fiber = getFiber();
174         if (task->localData) {
175           fiber->localData_ = *task->localData;
176         }
177         fiber->rcontext_ = std::move(task->rcontext);
178
179         fiber->setFunction(std::move(task->func));
180         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
181         if (observer_) {
182           observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
183         }
184         runReadyFiber(fiber);
185         hadRemoteFiber = true;
186       }
187     );
188   }
189
190   if (observer_) {
191     for (auto& yielded : yieldedFibers_) {
192       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
193     }
194   }
195   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
196
197   return fibersActive_ > 0;
198 }
199
200 // We need this to be in a struct, not inlined in addTask, because clang crashes
201 // otherwise.
202 template <typename F>
203 struct FiberManager::AddTaskHelper {
204   class Func;
205
206   static constexpr bool allocateInBuffer =
207     sizeof(Func) <= Fiber::kUserBufferSize;
208
209   class Func {
210    public:
211     Func(F&& func, FiberManager& fm) :
212         func_(std::forward<F>(func)), fm_(fm) {}
213
214     void operator()() {
215       try {
216         func_();
217       } catch (...) {
218         fm_.exceptionCallback_(std::current_exception(),
219                                "running Func functor");
220       }
221       if (allocateInBuffer) {
222         this->~Func();
223       } else {
224         delete this;
225       }
226     }
227
228    private:
229     F func_;
230     FiberManager& fm_;
231   };
232 };
233
234 template <typename F>
235 void FiberManager::addTask(F&& func) {
236   typedef AddTaskHelper<F> Helper;
237
238   auto fiber = getFiber();
239   initLocalData(*fiber);
240
241   if (Helper::allocateInBuffer) {
242     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
243     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
244
245     fiber->setFunction(std::ref(*funcLoc));
246   } else {
247     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
248
249     fiber->setFunction(std::ref(*funcLoc));
250   }
251
252   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
253   readyFibers_.push_back(*fiber);
254   if (observer_) {
255     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
256   }
257
258   ensureLoopScheduled();
259 }
260
261 template <typename F>
262 void FiberManager::addTaskRemote(F&& func) {
263   auto task = [&]() {
264     auto currentFm = getFiberManagerUnsafe();
265     if (currentFm &&
266         currentFm->currentFiber_ &&
267         currentFm->localType_ == localType_) {
268       return folly::make_unique<RemoteTask>(
269         std::forward<F>(func),
270         currentFm->currentFiber_->localData_);
271     }
272     return folly::make_unique<RemoteTask>(std::forward<F>(func));
273   }();
274   auto insertHead =
275       [&]() { return remoteTaskQueue_.insertHead(task.release()); };
276   loopController_->scheduleThreadSafe(std::ref(insertHead));
277 }
278
279 template <typename X>
280 struct IsRvalueRefTry { static const bool value = false; };
281 template <typename T>
282 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
283
284 // We need this to be in a struct, not inlined in addTaskFinally, because clang
285 // crashes otherwise.
286 template <typename F, typename G>
287 struct FiberManager::AddTaskFinallyHelper {
288   class Func;
289   class Finally;
290
291   typedef typename std::result_of<F()>::type Result;
292
293   static constexpr bool allocateInBuffer =
294     sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
295
296   class Finally {
297    public:
298     Finally(G&& finally,
299             FiberManager& fm) :
300         finally_(std::move(finally)),
301         fm_(fm) {
302     }
303
304     void operator()() {
305       try {
306         finally_(std::move(*result_));
307       } catch (...) {
308         fm_.exceptionCallback_(std::current_exception(),
309                                "running Finally functor");
310       }
311
312       if (allocateInBuffer) {
313         this->~Finally();
314       } else {
315         delete this;
316       }
317     }
318
319    private:
320     friend class Func;
321
322     G finally_;
323     folly::Optional<folly::Try<Result>> result_;
324     FiberManager& fm_;
325   };
326
327   class Func {
328    public:
329     Func(F&& func, Finally& finally) :
330         func_(std::move(func)), result_(finally.result_) {}
331
332     void operator()() {
333       result_ = folly::makeTryWith(std::move(func_));
334
335       if (allocateInBuffer) {
336         this->~Func();
337       } else {
338         delete this;
339       }
340     }
341
342    private:
343     F func_;
344     folly::Optional<folly::Try<Result>>& result_;
345   };
346 };
347
348 template <typename F, typename G>
349 void FiberManager::addTaskFinally(F&& func, G&& finally) {
350   typedef typename std::result_of<F()>::type Result;
351
352   static_assert(
353     IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
354     "finally(arg): arg must be Try<T>&&");
355   static_assert(
356     std::is_convertible<
357       Result,
358       typename std::remove_reference<
359         typename FirstArgOf<G>::type
360       >::type::element_type
361     >::value,
362     "finally(Try<T>&&): T must be convertible from func()'s return type");
363
364   auto fiber = getFiber();
365   initLocalData(*fiber);
366
367   typedef AddTaskFinallyHelper<F,G> Helper;
368
369   if (Helper::allocateInBuffer) {
370     auto funcLoc = static_cast<typename Helper::Func*>(
371       fiber->getUserBuffer());
372     auto finallyLoc = static_cast<typename Helper::Finally*>(
373       static_cast<void*>(funcLoc + 1));
374
375     new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
376     new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
377
378     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
379   } else {
380     auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
381     auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
382
383     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
384   }
385
386   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
387   readyFibers_.push_back(*fiber);
388   if (observer_) {
389     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
390   }
391
392   ensureLoopScheduled();
393 }
394
395 template <typename F>
396 typename std::result_of<F()>::type
397 FiberManager::runInMainContext(F&& func) {
398   return runInMainContextHelper(std::forward<F>(func));
399 }
400
401 template <typename F>
402 inline typename std::enable_if<
403   !std::is_same<typename std::result_of<F()>::type, void>::value,
404   typename std::result_of<F()>::type>::type
405 FiberManager::runInMainContextHelper(F&& func) {
406   if (UNLIKELY(activeFiber_ == nullptr)) {
407     return func();
408   }
409
410   typedef typename std::result_of<F()>::type Result;
411
412   folly::Try<Result> result;
413   auto f = [&func, &result]() mutable {
414     result = folly::makeTryWith(std::forward<F>(func));
415   };
416
417   immediateFunc_ = std::ref(f);
418   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
419
420   return std::move(result.value());
421 }
422
423 template <typename F>
424 inline typename std::enable_if<
425   std::is_same<typename std::result_of<F()>::type, void>::value,
426   void>::type
427 FiberManager::runInMainContextHelper(F&& func) {
428   if (UNLIKELY(activeFiber_ == nullptr)) {
429     func();
430     return;
431   }
432
433   immediateFunc_ = std::ref(func);
434   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
435 }
436
437 inline FiberManager& FiberManager::getFiberManager() {
438   assert(currentFiberManager_ != nullptr);
439   return *currentFiberManager_;
440 }
441
442 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
443   return currentFiberManager_;
444 }
445
446 inline bool FiberManager::hasActiveFiber() const {
447   return activeFiber_ != nullptr;
448 }
449
450 inline void FiberManager::yield() {
451   assert(currentFiberManager_ == this);
452   assert(activeFiber_ != nullptr);
453   assert(activeFiber_->state_ == Fiber::RUNNING);
454   activeFiber_->preempt(Fiber::YIELDED);
455 }
456
457 template <typename T>
458 T& FiberManager::local() {
459   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
460     return currentFiber_->localData_.get<T>();
461   }
462   return localThread<T>();
463 }
464
465 template <typename T>
466 T& FiberManager::localThread() {
467 #ifndef __APPLE__
468   static thread_local T t;
469   return t;
470 #else // osx doesn't support thread_local
471   static ThreadLocal<T> t;
472   return *t;
473 #endif
474 }
475
476 inline void FiberManager::initLocalData(Fiber& fiber) {
477   auto fm = getFiberManagerUnsafe();
478   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
479     fiber.localData_ = fm->currentFiber_->localData_;
480   }
481   fiber.rcontext_ = RequestContext::saveContext();
482 }
483
484 template <typename LocalT>
485 FiberManager::FiberManager(
486   LocalType<LocalT>,
487   std::unique_ptr<LoopController> loopController__,
488   Options options)  :
489     loopController_(std::move(loopController__)),
490     stackAllocator_(options.useGuardPages),
491     options_(preprocessOptions(std::move(options))),
492     exceptionCallback_([](std::exception_ptr eptr, std::string context) {
493         try {
494           std::rethrow_exception(eptr);
495         } catch (const std::exception& e) {
496           LOG(DFATAL) << "Exception " << typeid(e).name()
497                       << " with message '" << e.what() << "' was thrown in "
498                       << "FiberManager with context '" << context << "'";
499           throw;
500         } catch (...) {
501           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
502                       << "context '" << context << "'";
503           throw;
504         }
505       }),
506     timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
507     fibersPoolResizer_(*this),
508     localType_(typeid(LocalT)) {
509   loopController_->setFiberManager(this);
510 }
511
512 template <typename F>
513 typename FirstArgOf<F>::type::value_type
514 inline await(F&& func) {
515   typedef typename FirstArgOf<F>::type::value_type Result;
516
517   folly::Try<Result> result;
518
519   Baton baton;
520   baton.wait([&func, &result, &baton]() mutable {
521       func(Promise<Result>(result, baton));
522     });
523
524   return folly::moveFromTry(result);
525 }
526
527 }}