Don't require folly::Unit in addTaskFinally
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Promise.h>
33 #include <folly/futures/Try.h>
34
35 namespace folly {
36 namespace fibers {
37
38 namespace {
39
40 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
41 #ifdef FOLLY_SANITIZE_ADDRESS
42   /* ASAN needs a lot of extra stack space.
43      16x is a conservative estimate, 8x also worked with tests
44      where it mattered.  Note that overallocating here does not necessarily
45      increase RSS, since unused memory is pretty much free. */
46   opts.stackSize *= 16;
47 #endif
48   return opts;
49 }
50
51 } // anonymous
52
53 inline void FiberManager::ensureLoopScheduled() {
54   if (isLoopScheduled_) {
55     return;
56   }
57
58   isLoopScheduled_ = true;
59   loopController_->schedule();
60 }
61
62 inline intptr_t FiberManager::activateFiber(Fiber* fiber) {
63   DCHECK_EQ(activeFiber_, (Fiber*)nullptr);
64
65 #ifdef FOLLY_SANITIZE_ADDRESS
66   registerFiberActivationWithAsan(fiber);
67 #endif
68
69   activeFiber_ = fiber;
70   return jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
71 }
72
73 inline intptr_t FiberManager::deactivateFiber(Fiber* fiber) {
74   DCHECK_EQ(activeFiber_, fiber);
75
76 #ifdef FOLLY_SANITIZE_ADDRESS
77   registerFiberDeactivationWithAsan(fiber);
78 #endif
79
80   activeFiber_ = nullptr;
81   return jumpContext(&fiber->fcontext_, &mainContext_, 0);
82 }
83
84 inline void FiberManager::runReadyFiber(Fiber* fiber) {
85   SCOPE_EXIT {
86     assert(currentFiber_ == nullptr);
87     assert(activeFiber_ == nullptr);
88   };
89
90   assert(
91       fiber->state_ == Fiber::NOT_STARTED ||
92       fiber->state_ == Fiber::READY_TO_RUN);
93   currentFiber_ = fiber;
94   fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
95   if (observer_) {
96     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
97   }
98
99   while (fiber->state_ == Fiber::NOT_STARTED ||
100          fiber->state_ == Fiber::READY_TO_RUN) {
101     activateFiber(fiber);
102     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
103       try {
104         immediateFunc_();
105       } catch (...) {
106         exceptionCallback_(std::current_exception(), "running immediateFunc_");
107       }
108       immediateFunc_ = nullptr;
109       fiber->state_ = Fiber::READY_TO_RUN;
110     }
111   }
112
113   if (fiber->state_ == Fiber::AWAITING) {
114     awaitFunc_(*fiber);
115     awaitFunc_ = nullptr;
116     if (observer_) {
117       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
118     }
119     currentFiber_ = nullptr;
120     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
121   } else if (fiber->state_ == Fiber::INVALID) {
122     assert(fibersActive_ > 0);
123     --fibersActive_;
124     // Making sure that task functor is deleted once task is complete.
125     // NOTE: we must do it on main context, as the fiber is not
126     // running at this point.
127     fiber->func_ = nullptr;
128     fiber->resultFunc_ = nullptr;
129     if (fiber->finallyFunc_) {
130       try {
131         fiber->finallyFunc_();
132       } catch (...) {
133         exceptionCallback_(std::current_exception(), "running finallyFunc_");
134       }
135       fiber->finallyFunc_ = nullptr;
136     }
137     // Make sure LocalData is not accessible from its destructor
138     if (observer_) {
139       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
140     }
141     currentFiber_ = nullptr;
142     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
143     fiber->localData_.reset();
144     fiber->rcontext_.reset();
145
146     if (fibersPoolSize_ < options_.maxFibersPoolSize ||
147         options_.fibersPoolResizePeriodMs > 0) {
148       fibersPool_.push_front(*fiber);
149       ++fibersPoolSize_;
150     } else {
151       delete fiber;
152       assert(fibersAllocated_ > 0);
153       --fibersAllocated_;
154     }
155   } else if (fiber->state_ == Fiber::YIELDED) {
156     if (observer_) {
157       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
158     }
159     currentFiber_ = nullptr;
160     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
161     fiber->state_ = Fiber::READY_TO_RUN;
162     yieldedFibers_.push_back(*fiber);
163   }
164 }
165
166 inline bool FiberManager::loopUntilNoReady() {
167   // Support nested FiberManagers
168   auto originalFiberManager = this;
169   std::swap(currentFiberManager_, originalFiberManager);
170
171   SCOPE_EXIT {
172     isLoopScheduled_ = false;
173     if (!readyFibers_.empty()) {
174       ensureLoopScheduled();
175     }
176     std::swap(currentFiberManager_, originalFiberManager);
177     CHECK_EQ(this, originalFiberManager);
178   };
179
180   bool hadRemoteFiber = true;
181   while (hadRemoteFiber) {
182     hadRemoteFiber = false;
183
184     while (!readyFibers_.empty()) {
185       auto& fiber = readyFibers_.front();
186       readyFibers_.pop_front();
187       runReadyFiber(&fiber);
188     }
189
190     remoteReadyQueue_.sweep([this, &hadRemoteFiber](Fiber* fiber) {
191       runReadyFiber(fiber);
192       hadRemoteFiber = true;
193     });
194
195     remoteTaskQueue_.sweep([this, &hadRemoteFiber](RemoteTask* taskPtr) {
196       std::unique_ptr<RemoteTask> task(taskPtr);
197       auto fiber = getFiber();
198       if (task->localData) {
199         fiber->localData_ = *task->localData;
200       }
201       fiber->rcontext_ = std::move(task->rcontext);
202
203       fiber->setFunction(std::move(task->func));
204       fiber->data_ = reinterpret_cast<intptr_t>(fiber);
205       if (observer_) {
206         observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
207       }
208       runReadyFiber(fiber);
209       hadRemoteFiber = true;
210     });
211   }
212
213   if (observer_) {
214     for (auto& yielded : yieldedFibers_) {
215       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
216     }
217   }
218   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
219
220   return fibersActive_ > 0;
221 }
222
223 // We need this to be in a struct, not inlined in addTask, because clang crashes
224 // otherwise.
225 template <typename F>
226 struct FiberManager::AddTaskHelper {
227   class Func;
228
229   static constexpr bool allocateInBuffer =
230       sizeof(Func) <= Fiber::kUserBufferSize;
231
232   class Func {
233    public:
234     Func(F&& func, FiberManager& fm) : func_(std::forward<F>(func)), fm_(fm) {}
235
236     void operator()() {
237       try {
238         func_();
239       } catch (...) {
240         fm_.exceptionCallback_(
241             std::current_exception(), "running Func functor");
242       }
243       if (allocateInBuffer) {
244         this->~Func();
245       } else {
246         delete this;
247       }
248     }
249
250    private:
251     F func_;
252     FiberManager& fm_;
253   };
254 };
255
256 template <typename F>
257 void FiberManager::addTask(F&& func) {
258   typedef AddTaskHelper<F> Helper;
259
260   auto fiber = getFiber();
261   initLocalData(*fiber);
262
263   if (Helper::allocateInBuffer) {
264     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
265     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
266
267     fiber->setFunction(std::ref(*funcLoc));
268   } else {
269     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
270
271     fiber->setFunction(std::ref(*funcLoc));
272   }
273
274   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
275   readyFibers_.push_back(*fiber);
276   if (observer_) {
277     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
278   }
279
280   ensureLoopScheduled();
281 }
282
283 template <typename F>
284 auto FiberManager::addTaskFuture(F&& func) -> folly::Future<
285     typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
286   using T = typename std::result_of<F()>::type;
287   using FutureT = typename folly::Unit::Lift<T>::type;
288
289   folly::Promise<FutureT> p;
290   auto f = p.getFuture();
291   addTaskFinally(
292       [func = std::forward<F>(func)]() mutable { return func(); },
293       [p = std::move(p)](folly::Try<T> && t) mutable {
294         p.setTry(std::move(t));
295       });
296   return f;
297 }
298
299 template <typename F>
300 void FiberManager::addTaskRemote(F&& func) {
301   auto task = [&]() {
302     auto currentFm = getFiberManagerUnsafe();
303     if (currentFm && currentFm->currentFiber_ &&
304         currentFm->localType_ == localType_) {
305       return folly::make_unique<RemoteTask>(
306           std::forward<F>(func), currentFm->currentFiber_->localData_);
307     }
308     return folly::make_unique<RemoteTask>(std::forward<F>(func));
309   }();
310   auto insertHead = [&]() {
311     return remoteTaskQueue_.insertHead(task.release());
312   };
313   loopController_->scheduleThreadSafe(std::ref(insertHead));
314 }
315
316 template <typename F>
317 auto FiberManager::addTaskRemoteFuture(F&& func) -> folly::Future<
318     typename folly::Unit::Lift<typename std::result_of<F()>::type>::type> {
319   folly::Promise<
320       typename folly::Unit::Lift<typename std::result_of<F()>::type>::type>
321       p;
322   auto f = p.getFuture();
323   addTaskRemote(
324       [ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
325         auto t = folly::makeTryWith(std::forward<F>(func));
326         runInMainContext([&]() { p.setTry(std::move(t)); });
327       });
328   return f;
329 }
330
331 template <typename X>
332 struct IsRvalueRefTry {
333   static const bool value = false;
334 };
335 template <typename T>
336 struct IsRvalueRefTry<folly::Try<T>&&> {
337   static const bool value = true;
338 };
339
340 // We need this to be in a struct, not inlined in addTaskFinally, because clang
341 // crashes otherwise.
342 template <typename F, typename G>
343 struct FiberManager::AddTaskFinallyHelper {
344   class Func;
345
346   typedef typename std::result_of<F()>::type Result;
347
348   class Finally {
349    public:
350     Finally(G finally, FiberManager& fm)
351         : finally_(std::move(finally)), fm_(fm) {}
352
353     void operator()() {
354       try {
355         finally_(std::move(*result_));
356       } catch (...) {
357         fm_.exceptionCallback_(
358             std::current_exception(), "running Finally functor");
359       }
360
361       if (allocateInBuffer) {
362         this->~Finally();
363       } else {
364         delete this;
365       }
366     }
367
368    private:
369     friend class Func;
370
371     G finally_;
372     folly::Optional<folly::Try<Result>> result_;
373     FiberManager& fm_;
374   };
375
376   class Func {
377    public:
378     Func(F func, Finally& finally)
379         : func_(std::move(func)), result_(finally.result_) {}
380
381     void operator()() {
382       result_ = folly::makeTryWith(std::move(func_));
383
384       if (allocateInBuffer) {
385         this->~Func();
386       } else {
387         delete this;
388       }
389     }
390
391    private:
392     F func_;
393     folly::Optional<folly::Try<Result>>& result_;
394   };
395
396   static constexpr bool allocateInBuffer =
397       sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
398 };
399
400 template <typename F, typename G>
401 void FiberManager::addTaskFinally(F&& func, G&& finally) {
402   typedef typename std::result_of<F()>::type Result;
403
404   static_assert(
405       IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
406       "finally(arg): arg must be Try<T>&&");
407   static_assert(
408       std::is_convertible<
409           Result,
410           typename std::remove_reference<
411               typename FirstArgOf<G>::type>::type::element_type>::value,
412       "finally(Try<T>&&): T must be convertible from func()'s return type");
413
414   auto fiber = getFiber();
415   initLocalData(*fiber);
416
417   typedef AddTaskFinallyHelper<
418       typename std::decay<F>::type,
419       typename std::decay<G>::type>
420       Helper;
421
422   if (Helper::allocateInBuffer) {
423     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
424     auto finallyLoc =
425         static_cast<typename Helper::Finally*>(static_cast<void*>(funcLoc + 1));
426
427     new (finallyLoc) typename Helper::Finally(std::forward<G>(finally), *this);
428     new (funcLoc) typename Helper::Func(std::forward<F>(func), *finallyLoc);
429
430     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
431   } else {
432     auto finallyLoc =
433         new typename Helper::Finally(std::forward<G>(finally), *this);
434     auto funcLoc =
435         new typename Helper::Func(std::forward<F>(func), *finallyLoc);
436
437     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
438   }
439
440   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
441   readyFibers_.push_back(*fiber);
442   if (observer_) {
443     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
444   }
445
446   ensureLoopScheduled();
447 }
448
449 template <typename F>
450 typename std::result_of<F()>::type FiberManager::runInMainContext(F&& func) {
451   if (UNLIKELY(activeFiber_ == nullptr)) {
452     return func();
453   }
454
455   typedef typename std::result_of<F()>::type Result;
456
457   folly::Try<Result> result;
458   auto f = [&func, &result]() mutable {
459     result = folly::makeTryWith(std::forward<F>(func));
460   };
461
462   immediateFunc_ = std::ref(f);
463   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
464
465   return std::move(result).value();
466 }
467
468 inline FiberManager& FiberManager::getFiberManager() {
469   assert(currentFiberManager_ != nullptr);
470   return *currentFiberManager_;
471 }
472
473 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
474   return currentFiberManager_;
475 }
476
477 inline bool FiberManager::hasActiveFiber() const {
478   return activeFiber_ != nullptr;
479 }
480
481 inline void FiberManager::yield() {
482   assert(currentFiberManager_ == this);
483   assert(activeFiber_ != nullptr);
484   assert(activeFiber_->state_ == Fiber::RUNNING);
485   activeFiber_->preempt(Fiber::YIELDED);
486 }
487
488 template <typename T>
489 T& FiberManager::local() {
490   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
491     return currentFiber_->localData_.get<T>();
492   }
493   return localThread<T>();
494 }
495
496 template <typename T>
497 T& FiberManager::localThread() {
498 #ifndef __APPLE__
499   static thread_local T t;
500   return t;
501 #else // osx doesn't support thread_local
502   static ThreadLocal<T> t;
503   return *t;
504 #endif
505 }
506
507 inline void FiberManager::initLocalData(Fiber& fiber) {
508   auto fm = getFiberManagerUnsafe();
509   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
510     fiber.localData_ = fm->currentFiber_->localData_;
511   }
512   fiber.rcontext_ = RequestContext::saveContext();
513 }
514
515 template <typename LocalT>
516 FiberManager::FiberManager(
517     LocalType<LocalT>,
518     std::unique_ptr<LoopController> loopController__,
519     Options options)
520     : loopController_(std::move(loopController__)),
521       stackAllocator_(options.useGuardPages),
522       options_(preprocessOptions(std::move(options))),
523       exceptionCallback_([](std::exception_ptr eptr, std::string context) {
524         try {
525           std::rethrow_exception(eptr);
526         } catch (const std::exception& e) {
527           LOG(DFATAL) << "Exception " << typeid(e).name() << " with message '"
528                       << e.what() << "' was thrown in "
529                       << "FiberManager with context '" << context << "'";
530           throw;
531         } catch (...) {
532           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
533                       << "context '" << context << "'";
534           throw;
535         }
536       }),
537       timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
538       fibersPoolResizer_(*this),
539       localType_(typeid(LocalT)) {
540   loopController_->setFiberManager(this);
541 }
542
543 template <typename F>
544 typename FirstArgOf<F>::type::value_type inline await(F&& func) {
545   typedef typename FirstArgOf<F>::type::value_type Result;
546
547   folly::Try<Result> result;
548
549   Baton baton;
550   baton.wait([&func, &result, &baton]() mutable {
551     func(Promise<Result>(result, baton));
552   });
553
554   return folly::moveFromTry(result);
555 }
556 }
557 }