Adding addTaskFuture and addTaskRemoteFuture to FiberManager.
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/CPortability.h>
21 #include <folly/Memory.h>
22 #include <folly/Optional.h>
23 #include <folly/Portability.h>
24 #include <folly/ScopeGuard.h>
25 #ifdef __APPLE__
26 #include <folly/ThreadLocal.h>
27 #endif
28 #include <folly/experimental/fibers/Baton.h>
29 #include <folly/experimental/fibers/Fiber.h>
30 #include <folly/experimental/fibers/LoopController.h>
31 #include <folly/experimental/fibers/Promise.h>
32 #include <folly/futures/Promise.h>
33 #include <folly/futures/Try.h>
34
35 namespace folly { namespace fibers {
36
37 namespace {
38
39 inline FiberManager::Options preprocessOptions(FiberManager::Options opts) {
40 #ifdef FOLLY_SANITIZE_ADDRESS
41   /* ASAN needs a lot of extra stack space.
42      16x is a conservative estimate, 8x also worked with tests
43      where it mattered.  Note that overallocating here does not necessarily
44      increase RSS, since unused memory is pretty much free. */
45   opts.stackSize *= 16;
46 #endif
47   return opts;
48 }
49
50 }  // anonymous
51
52 inline void FiberManager::ensureLoopScheduled() {
53   if (isLoopScheduled_) {
54     return;
55   }
56
57   isLoopScheduled_ = true;
58   loopController_->schedule();
59 }
60
61 inline void FiberManager::runReadyFiber(Fiber* fiber) {
62   SCOPE_EXIT {
63     assert(currentFiber_ == nullptr);
64     assert(activeFiber_ == nullptr);
65   };
66
67   assert(fiber->state_ == Fiber::NOT_STARTED ||
68          fiber->state_ == Fiber::READY_TO_RUN);
69   currentFiber_ = fiber;
70   fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
71   if (observer_) {
72     observer_->starting(reinterpret_cast<uintptr_t>(fiber));
73   }
74
75   while (fiber->state_ == Fiber::NOT_STARTED ||
76          fiber->state_ == Fiber::READY_TO_RUN) {
77     activeFiber_ = fiber;
78     jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
79     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
80       try {
81         immediateFunc_();
82       } catch (...) {
83         exceptionCallback_(std::current_exception(), "running immediateFunc_");
84       }
85       immediateFunc_ = nullptr;
86       fiber->state_ = Fiber::READY_TO_RUN;
87     }
88   }
89
90   if (fiber->state_ == Fiber::AWAITING) {
91     awaitFunc_(*fiber);
92     awaitFunc_ = nullptr;
93     if (observer_) {
94       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
95     }
96     currentFiber_ = nullptr;
97     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
98   } else if (fiber->state_ == Fiber::INVALID) {
99     assert(fibersActive_ > 0);
100     --fibersActive_;
101     // Making sure that task functor is deleted once task is complete.
102     // NOTE: we must do it on main context, as the fiber is not
103     // running at this point.
104     fiber->func_ = nullptr;
105     fiber->resultFunc_ = nullptr;
106     if (fiber->finallyFunc_) {
107       try {
108         fiber->finallyFunc_();
109       } catch (...) {
110         exceptionCallback_(std::current_exception(), "running finallyFunc_");
111       }
112       fiber->finallyFunc_ = nullptr;
113     }
114     // Make sure LocalData is not accessible from its destructor
115     if (observer_) {
116       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
117     }
118     currentFiber_ = nullptr;
119     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
120     fiber->localData_.reset();
121     fiber->rcontext_.reset();
122
123     if (fibersPoolSize_ < options_.maxFibersPoolSize ||
124         options_.fibersPoolResizePeriodMs > 0) {
125       fibersPool_.push_front(*fiber);
126       ++fibersPoolSize_;
127     } else {
128       delete fiber;
129       assert(fibersAllocated_ > 0);
130       --fibersAllocated_;
131     }
132   } else if (fiber->state_ == Fiber::YIELDED) {
133     if (observer_) {
134       observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
135     }
136     currentFiber_ = nullptr;
137     fiber->rcontext_ = RequestContext::setContext(std::move(fiber->rcontext_));
138     fiber->state_ = Fiber::READY_TO_RUN;
139     yieldedFibers_.push_back(*fiber);
140   }
141 }
142
143 inline bool FiberManager::loopUntilNoReady() {
144   SCOPE_EXIT {
145     isLoopScheduled_ = false;
146     if (!readyFibers_.empty()) {
147       ensureLoopScheduled();
148     }
149     currentFiberManager_ = nullptr;
150   };
151
152   currentFiberManager_ = this;
153
154   bool hadRemoteFiber = true;
155   while (hadRemoteFiber) {
156     hadRemoteFiber = false;
157
158     while (!readyFibers_.empty()) {
159       auto& fiber = readyFibers_.front();
160       readyFibers_.pop_front();
161       runReadyFiber(&fiber);
162     }
163
164     remoteReadyQueue_.sweep(
165       [this, &hadRemoteFiber] (Fiber* fiber) {
166         runReadyFiber(fiber);
167         hadRemoteFiber = true;
168       }
169     );
170
171     remoteTaskQueue_.sweep(
172       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
173         std::unique_ptr<RemoteTask> task(taskPtr);
174         auto fiber = getFiber();
175         if (task->localData) {
176           fiber->localData_ = *task->localData;
177         }
178         fiber->rcontext_ = std::move(task->rcontext);
179
180         fiber->setFunction(std::move(task->func));
181         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
182         if (observer_) {
183           observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
184         }
185         runReadyFiber(fiber);
186         hadRemoteFiber = true;
187       }
188     );
189   }
190
191   if (observer_) {
192     for (auto& yielded : yieldedFibers_) {
193       observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
194     }
195   }
196   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
197
198   return fibersActive_ > 0;
199 }
200
201 // We need this to be in a struct, not inlined in addTask, because clang crashes
202 // otherwise.
203 template <typename F>
204 struct FiberManager::AddTaskHelper {
205   class Func;
206
207   static constexpr bool allocateInBuffer =
208     sizeof(Func) <= Fiber::kUserBufferSize;
209
210   class Func {
211    public:
212     Func(F&& func, FiberManager& fm) :
213         func_(std::forward<F>(func)), fm_(fm) {}
214
215     void operator()() {
216       try {
217         func_();
218       } catch (...) {
219         fm_.exceptionCallback_(std::current_exception(),
220                                "running Func functor");
221       }
222       if (allocateInBuffer) {
223         this->~Func();
224       } else {
225         delete this;
226       }
227     }
228
229    private:
230     F func_;
231     FiberManager& fm_;
232   };
233 };
234
235 template <typename F>
236 void FiberManager::addTask(F&& func) {
237   typedef AddTaskHelper<F> Helper;
238
239   auto fiber = getFiber();
240   initLocalData(*fiber);
241
242   if (Helper::allocateInBuffer) {
243     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
244     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
245
246     fiber->setFunction(std::ref(*funcLoc));
247   } else {
248     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
249
250     fiber->setFunction(std::ref(*funcLoc));
251   }
252
253   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
254   readyFibers_.push_back(*fiber);
255   if (observer_) {
256     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
257   }
258
259   ensureLoopScheduled();
260 }
261
262 template <typename F>
263 auto FiberManager::addTaskFuture(F&& func)
264     -> folly::Future<typename std::result_of<F()>::type> {
265   using T = typename std::result_of<F()>::type;
266   folly::Promise<T> p;
267   auto f = p.getFuture();
268   addTaskFinally([func = std::forward<F>(func)]() mutable { return func(); },
269                  [p = std::move(p)](folly::Try<T> && t) mutable {
270                    p.setTry(std::move(t));
271                  });
272   return f;
273 }
274
275 template <typename F>
276 void FiberManager::addTaskRemote(F&& func) {
277   // addTaskRemote indirectly requires wrapping the function in a
278   // std::function, which must be copyable. As move-only lambdas may be
279   // passed in we wrap it first in a move wrapper and then capture the wrapped
280   // version.
281   auto functionWrapper = [f = folly::makeMoveWrapper(
282                               std::forward<F>(func))]() mutable {
283     return (*f)();
284   };
285   auto task = [&]() {
286     auto currentFm = getFiberManagerUnsafe();
287     if (currentFm &&
288         currentFm->currentFiber_ &&
289         currentFm->localType_ == localType_) {
290       return folly::make_unique<RemoteTask>(
291           std::move(functionWrapper), currentFm->currentFiber_->localData_);
292     }
293     return folly::make_unique<RemoteTask>(std::move(functionWrapper));
294   }();
295   auto insertHead =
296       [&]() { return remoteTaskQueue_.insertHead(task.release()); };
297   loopController_->scheduleThreadSafe(std::ref(insertHead));
298 }
299
300 template <typename F>
301 auto FiberManager::addTaskRemoteFuture(F&& func)
302     -> folly::Future<typename std::result_of<F()>::type> {
303   folly::Promise<typename std::result_of<F()>::type> p;
304   auto f = p.getFuture();
305   addTaskRemote(
306       [ p = std::move(p), func = std::forward<F>(func), this ]() mutable {
307         auto t = folly::makeTryWith(std::forward<F>(func));
308         runInMainContext([&]() { p.setTry(std::move(t)); });
309       });
310   return f;
311 }
312
313 template <typename X>
314 struct IsRvalueRefTry { static const bool value = false; };
315 template <typename T>
316 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
317
318 // We need this to be in a struct, not inlined in addTaskFinally, because clang
319 // crashes otherwise.
320 template <typename F, typename G>
321 struct FiberManager::AddTaskFinallyHelper {
322   class Func;
323   class Finally;
324
325   typedef typename std::result_of<F()>::type Result;
326
327   static constexpr bool allocateInBuffer =
328     sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
329
330   class Finally {
331    public:
332     Finally(G&& finally,
333             FiberManager& fm) :
334         finally_(std::move(finally)),
335         fm_(fm) {
336     }
337
338     void operator()() {
339       try {
340         finally_(std::move(*result_));
341       } catch (...) {
342         fm_.exceptionCallback_(std::current_exception(),
343                                "running Finally functor");
344       }
345
346       if (allocateInBuffer) {
347         this->~Finally();
348       } else {
349         delete this;
350       }
351     }
352
353    private:
354     friend class Func;
355
356     G finally_;
357     folly::Optional<folly::Try<Result>> result_;
358     FiberManager& fm_;
359   };
360
361   class Func {
362    public:
363     Func(F&& func, Finally& finally) :
364         func_(std::move(func)), result_(finally.result_) {}
365
366     void operator()() {
367       result_ = folly::makeTryWith(std::move(func_));
368
369       if (allocateInBuffer) {
370         this->~Func();
371       } else {
372         delete this;
373       }
374     }
375
376    private:
377     F func_;
378     folly::Optional<folly::Try<Result>>& result_;
379   };
380 };
381
382 template <typename F, typename G>
383 void FiberManager::addTaskFinally(F&& func, G&& finally) {
384   typedef typename std::result_of<F()>::type Result;
385
386   static_assert(
387     IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
388     "finally(arg): arg must be Try<T>&&");
389   static_assert(
390     std::is_convertible<
391       Result,
392       typename std::remove_reference<
393         typename FirstArgOf<G>::type
394       >::type::element_type
395     >::value,
396     "finally(Try<T>&&): T must be convertible from func()'s return type");
397
398   auto fiber = getFiber();
399   initLocalData(*fiber);
400
401   typedef AddTaskFinallyHelper<F,G> Helper;
402
403   if (Helper::allocateInBuffer) {
404     auto funcLoc = static_cast<typename Helper::Func*>(
405       fiber->getUserBuffer());
406     auto finallyLoc = static_cast<typename Helper::Finally*>(
407       static_cast<void*>(funcLoc + 1));
408
409     new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
410     new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
411
412     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
413   } else {
414     auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
415     auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
416
417     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
418   }
419
420   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
421   readyFibers_.push_back(*fiber);
422   if (observer_) {
423     observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
424   }
425
426   ensureLoopScheduled();
427 }
428
429 template <typename F>
430 typename std::result_of<F()>::type
431 FiberManager::runInMainContext(F&& func) {
432   return runInMainContextHelper(std::forward<F>(func));
433 }
434
435 template <typename F>
436 inline typename std::enable_if<
437   !std::is_same<typename std::result_of<F()>::type, void>::value,
438   typename std::result_of<F()>::type>::type
439 FiberManager::runInMainContextHelper(F&& func) {
440   if (UNLIKELY(activeFiber_ == nullptr)) {
441     return func();
442   }
443
444   typedef typename std::result_of<F()>::type Result;
445
446   folly::Try<Result> result;
447   auto f = [&func, &result]() mutable {
448     result = folly::makeTryWith(std::forward<F>(func));
449   };
450
451   immediateFunc_ = std::ref(f);
452   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
453
454   return std::move(result.value());
455 }
456
457 template <typename F>
458 inline typename std::enable_if<
459   std::is_same<typename std::result_of<F()>::type, void>::value,
460   void>::type
461 FiberManager::runInMainContextHelper(F&& func) {
462   if (UNLIKELY(activeFiber_ == nullptr)) {
463     func();
464     return;
465   }
466
467   immediateFunc_ = std::ref(func);
468   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
469 }
470
471 inline FiberManager& FiberManager::getFiberManager() {
472   assert(currentFiberManager_ != nullptr);
473   return *currentFiberManager_;
474 }
475
476 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
477   return currentFiberManager_;
478 }
479
480 inline bool FiberManager::hasActiveFiber() const {
481   return activeFiber_ != nullptr;
482 }
483
484 inline void FiberManager::yield() {
485   assert(currentFiberManager_ == this);
486   assert(activeFiber_ != nullptr);
487   assert(activeFiber_->state_ == Fiber::RUNNING);
488   activeFiber_->preempt(Fiber::YIELDED);
489 }
490
491 template <typename T>
492 T& FiberManager::local() {
493   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
494     return currentFiber_->localData_.get<T>();
495   }
496   return localThread<T>();
497 }
498
499 template <typename T>
500 T& FiberManager::localThread() {
501 #ifndef __APPLE__
502   static thread_local T t;
503   return t;
504 #else // osx doesn't support thread_local
505   static ThreadLocal<T> t;
506   return *t;
507 #endif
508 }
509
510 inline void FiberManager::initLocalData(Fiber& fiber) {
511   auto fm = getFiberManagerUnsafe();
512   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
513     fiber.localData_ = fm->currentFiber_->localData_;
514   }
515   fiber.rcontext_ = RequestContext::saveContext();
516 }
517
518 template <typename LocalT>
519 FiberManager::FiberManager(
520   LocalType<LocalT>,
521   std::unique_ptr<LoopController> loopController__,
522   Options options)  :
523     loopController_(std::move(loopController__)),
524     stackAllocator_(options.useGuardPages),
525     options_(preprocessOptions(std::move(options))),
526     exceptionCallback_([](std::exception_ptr eptr, std::string context) {
527         try {
528           std::rethrow_exception(eptr);
529         } catch (const std::exception& e) {
530           LOG(DFATAL) << "Exception " << typeid(e).name()
531                       << " with message '" << e.what() << "' was thrown in "
532                       << "FiberManager with context '" << context << "'";
533           throw;
534         } catch (...) {
535           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
536                       << "context '" << context << "'";
537           throw;
538         }
539       }),
540     timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
541     fibersPoolResizer_(*this),
542     localType_(typeid(LocalT)) {
543   loopController_->setFiberManager(this);
544 }
545
546 template <typename F>
547 typename FirstArgOf<F>::type::value_type
548 inline await(F&& func) {
549   typedef typename FirstArgOf<F>::type::value_type Result;
550
551   folly::Try<Result> result;
552
553   Baton baton;
554   baton.wait([&func, &result, &baton]() mutable {
555       func(Promise<Result>(result, baton));
556     });
557
558   return folly::moveFromTry(result);
559 }
560
561 }}