793cf1fab534ad6dd78fa7583d1a386b675d330c
[folly.git] / folly / experimental / fibers / FiberManager-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <cassert>
19
20 #include <folly/Memory.h>
21 #include <folly/Optional.h>
22 #include <folly/Portability.h>
23 #include <folly/ScopeGuard.h>
24 #include <folly/experimental/fibers/Baton.h>
25 #include <folly/experimental/fibers/Fiber.h>
26 #include <folly/experimental/fibers/Promise.h>
27 #include <folly/experimental/fibers/LoopController.h>
28 #include <folly/futures/Try.h>
29
30 namespace folly { namespace fibers {
31
32 inline void FiberManager::ensureLoopScheduled() {
33   if (isLoopScheduled_) {
34     return;
35   }
36
37   isLoopScheduled_ = true;
38   loopController_->schedule();
39 }
40
41 inline void FiberManager::runReadyFiber(Fiber* fiber) {
42   SCOPE_EXIT {
43     assert(currentFiber_ == nullptr);
44     assert(activeFiber_ == nullptr);
45   };
46
47   assert(fiber->state_ == Fiber::NOT_STARTED ||
48          fiber->state_ == Fiber::READY_TO_RUN);
49   currentFiber_ = fiber;
50   if (observer_) {
51     observer_->starting();
52   }
53
54   while (fiber->state_ == Fiber::NOT_STARTED ||
55          fiber->state_ == Fiber::READY_TO_RUN) {
56     activeFiber_ = fiber;
57     jumpContext(&mainContext_, &fiber->fcontext_, fiber->data_);
58     if (fiber->state_ == Fiber::AWAITING_IMMEDIATE) {
59       try {
60         immediateFunc_();
61       } catch (...) {
62         exceptionCallback_(std::current_exception(), "running immediateFunc_");
63       }
64       immediateFunc_ = nullptr;
65       fiber->state_ = Fiber::READY_TO_RUN;
66     }
67   }
68
69   if (fiber->state_ == Fiber::AWAITING) {
70     awaitFunc_(*fiber);
71     awaitFunc_ = nullptr;
72     if (observer_) {
73       observer_->stopped();
74     }
75     currentFiber_ = nullptr;
76   } else if (fiber->state_ == Fiber::INVALID) {
77     assert(fibersActive_ > 0);
78     --fibersActive_;
79     // Making sure that task functor is deleted once task is complete.
80     // NOTE: we must do it on main context, as the fiber is not
81     // running at this point.
82     fiber->func_ = nullptr;
83     fiber->resultFunc_ = nullptr;
84     if (fiber->finallyFunc_) {
85       try {
86         fiber->finallyFunc_();
87       } catch (...) {
88         exceptionCallback_(std::current_exception(), "running finallyFunc_");
89       }
90       fiber->finallyFunc_ = nullptr;
91     }
92     // Make sure LocalData is not accessible from its destructor
93     if (observer_) {
94       observer_->stopped();
95     }
96     currentFiber_ = nullptr;
97     fiber->localData_.reset();
98
99     if (fibersPoolSize_ < options_.maxFibersPoolSize) {
100       fibersPool_.push_front(*fiber);
101       ++fibersPoolSize_;
102     } else {
103       delete fiber;
104       assert(fibersAllocated_ > 0);
105       --fibersAllocated_;
106     }
107   } else if (fiber->state_ == Fiber::YIELDED) {
108     if (observer_) {
109       observer_->stopped();
110     }
111     currentFiber_ = nullptr;
112     fiber->state_ = Fiber::READY_TO_RUN;
113     yieldedFibers_.push_back(*fiber);
114   }
115 }
116
117 inline bool FiberManager::loopUntilNoReady() {
118   SCOPE_EXIT {
119     isLoopScheduled_ = false;
120     if (!readyFibers_.empty()) {
121       ensureLoopScheduled();
122     }
123     currentFiberManager_ = nullptr;
124   };
125
126   currentFiberManager_ = this;
127
128   bool hadRemoteFiber = true;
129   while (hadRemoteFiber) {
130     hadRemoteFiber = false;
131
132     while (!readyFibers_.empty()) {
133       auto& fiber = readyFibers_.front();
134       readyFibers_.pop_front();
135       runReadyFiber(&fiber);
136     }
137
138     remoteReadyQueue_.sweep(
139       [this, &hadRemoteFiber] (Fiber* fiber) {
140         runReadyFiber(fiber);
141         hadRemoteFiber = true;
142       }
143     );
144
145     remoteTaskQueue_.sweep(
146       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
147         std::unique_ptr<RemoteTask> task(taskPtr);
148         auto fiber = getFiber();
149         if (task->localData) {
150           fiber->localData_ = *task->localData;
151         }
152
153         fiber->setFunction(std::move(task->func));
154         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
155         runReadyFiber(fiber);
156         hadRemoteFiber = true;
157       }
158     );
159   }
160
161   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
162
163   return fibersActive_ > 0;
164 }
165
166 // We need this to be in a struct, not inlined in addTask, because clang crashes
167 // otherwise.
168 template <typename F>
169 struct FiberManager::AddTaskHelper {
170   class Func;
171
172   static constexpr bool allocateInBuffer =
173     sizeof(Func) <= Fiber::kUserBufferSize;
174
175   class Func {
176    public:
177     Func(F&& func, FiberManager& fm) :
178         func_(std::forward<F>(func)), fm_(fm) {}
179
180     void operator()() {
181       try {
182         func_();
183       } catch (...) {
184         fm_.exceptionCallback_(std::current_exception(),
185                                "running Func functor");
186       }
187       if (allocateInBuffer) {
188         this->~Func();
189       } else {
190         delete this;
191       }
192     }
193
194    private:
195     F func_;
196     FiberManager& fm_;
197   };
198 };
199
200 template <typename F>
201 void FiberManager::addTask(F&& func) {
202   typedef AddTaskHelper<F> Helper;
203
204   auto fiber = getFiber();
205   initLocalData(*fiber);
206
207   if (Helper::allocateInBuffer) {
208     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
209     new (funcLoc) typename Helper::Func(std::forward<F>(func), *this);
210
211     fiber->setFunction(std::ref(*funcLoc));
212   } else {
213     auto funcLoc = new typename Helper::Func(std::forward<F>(func), *this);
214
215     fiber->setFunction(std::ref(*funcLoc));
216   }
217
218   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
219   readyFibers_.push_back(*fiber);
220
221   ensureLoopScheduled();
222 }
223
224 template <typename F>
225 void FiberManager::addTaskRemote(F&& func) {
226   auto task = [&]() {
227     auto currentFm = getFiberManagerUnsafe();
228     if (currentFm &&
229         currentFm->currentFiber_ &&
230         currentFm->localType_ == localType_) {
231       return folly::make_unique<RemoteTask>(
232         std::forward<F>(func),
233         currentFm->currentFiber_->localData_);
234     }
235     return folly::make_unique<RemoteTask>(std::forward<F>(func));
236   }();
237   if (remoteTaskQueue_.insertHead(task.release())) {
238     loopController_->scheduleThreadSafe();
239   }
240 }
241
242 template <typename X>
243 struct IsRvalueRefTry { static const bool value = false; };
244 template <typename T>
245 struct IsRvalueRefTry<folly::Try<T>&&> { static const bool value = true; };
246
247 // We need this to be in a struct, not inlined in addTaskFinally, because clang
248 // crashes otherwise.
249 template <typename F, typename G>
250 struct FiberManager::AddTaskFinallyHelper {
251   class Func;
252   class Finally;
253
254   typedef typename std::result_of<F()>::type Result;
255
256   static constexpr bool allocateInBuffer =
257     sizeof(Func) + sizeof(Finally) <= Fiber::kUserBufferSize;
258
259   class Finally {
260    public:
261     Finally(G&& finally,
262             FiberManager& fm) :
263         finally_(std::move(finally)),
264         fm_(fm) {
265     }
266
267     void operator()() {
268       try {
269         finally_(std::move(*result_));
270       } catch (...) {
271         fm_.exceptionCallback_(std::current_exception(),
272                                "running Finally functor");
273       }
274
275       if (allocateInBuffer) {
276         this->~Finally();
277       } else {
278         delete this;
279       }
280     }
281
282    private:
283     friend class Func;
284
285     G finally_;
286     folly::Optional<folly::Try<Result>> result_;
287     FiberManager& fm_;
288   };
289
290   class Func {
291    public:
292     Func(F&& func, Finally& finally) :
293         func_(std::move(func)), result_(finally.result_) {}
294
295     void operator()() {
296       result_ = folly::makeTryFunction(std::move(func_));
297
298       if (allocateInBuffer) {
299         this->~Func();
300       } else {
301         delete this;
302       }
303     }
304
305    private:
306     F func_;
307     folly::Optional<folly::Try<Result>>& result_;
308   };
309 };
310
311 template <typename F, typename G>
312 void FiberManager::addTaskFinally(F&& func, G&& finally) {
313   typedef typename std::result_of<F()>::type Result;
314
315   static_assert(
316     IsRvalueRefTry<typename FirstArgOf<G>::type>::value,
317     "finally(arg): arg must be Try<T>&&");
318   static_assert(
319     std::is_convertible<
320       Result,
321       typename std::remove_reference<
322         typename FirstArgOf<G>::type
323       >::type::element_type
324     >::value,
325     "finally(Try<T>&&): T must be convertible from func()'s return type");
326
327   auto fiber = getFiber();
328   initLocalData(*fiber);
329
330   typedef AddTaskFinallyHelper<F,G> Helper;
331
332   if (Helper::allocateInBuffer) {
333     auto funcLoc = static_cast<typename Helper::Func*>(
334       fiber->getUserBuffer());
335     auto finallyLoc = static_cast<typename Helper::Finally*>(
336       static_cast<void*>(funcLoc + 1));
337
338     new (finallyLoc) typename Helper::Finally(std::move(finally), *this);
339     new (funcLoc) typename Helper::Func(std::move(func), *finallyLoc);
340
341     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
342   } else {
343     auto finallyLoc = new typename Helper::Finally(std::move(finally), *this);
344     auto funcLoc = new typename Helper::Func(std::move(func), *finallyLoc);
345
346     fiber->setFunctionFinally(std::ref(*funcLoc), std::ref(*finallyLoc));
347   }
348
349   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
350   readyFibers_.push_back(*fiber);
351
352   ensureLoopScheduled();
353 }
354
355 template <typename F>
356 typename std::result_of<F()>::type
357 FiberManager::runInMainContext(F&& func) {
358   return runInMainContextHelper(std::forward<F>(func));
359 }
360
361 template <typename F>
362 inline typename std::enable_if<
363   !std::is_same<typename std::result_of<F()>::type, void>::value,
364   typename std::result_of<F()>::type>::type
365 FiberManager::runInMainContextHelper(F&& func) {
366   if (UNLIKELY(activeFiber_ == nullptr)) {
367     return func();
368   }
369
370   typedef typename std::result_of<F()>::type Result;
371
372   folly::Try<Result> result;
373   auto f = [&func, &result]() mutable {
374     result = folly::makeTryFunction(std::forward<F>(func));
375   };
376
377   immediateFunc_ = std::ref(f);
378   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
379
380   return std::move(result.value());
381 }
382
383 template <typename F>
384 inline typename std::enable_if<
385   std::is_same<typename std::result_of<F()>::type, void>::value,
386   void>::type
387 FiberManager::runInMainContextHelper(F&& func) {
388   if (UNLIKELY(activeFiber_ == nullptr)) {
389     func();
390     return;
391   }
392
393   immediateFunc_ = std::ref(func);
394   activeFiber_->preempt(Fiber::AWAITING_IMMEDIATE);
395 }
396
397 inline FiberManager& FiberManager::getFiberManager() {
398   assert(currentFiberManager_ != nullptr);
399   return *currentFiberManager_;
400 }
401
402 inline FiberManager* FiberManager::getFiberManagerUnsafe() {
403   return currentFiberManager_;
404 }
405
406 inline bool FiberManager::hasActiveFiber() const {
407   return activeFiber_ != nullptr;
408 }
409
410 inline void FiberManager::yield() {
411   assert(currentFiberManager_ == this);
412   assert(activeFiber_ != nullptr);
413   assert(activeFiber_->state_ == Fiber::RUNNING);
414   activeFiber_->preempt(Fiber::YIELDED);
415 }
416
417 template <typename T>
418 T& FiberManager::local() {
419   if (std::type_index(typeid(T)) == localType_ && currentFiber_) {
420     return currentFiber_->localData_.get<T>();
421   }
422   return localThread<T>();
423 }
424
425 template <typename T>
426 T& FiberManager::localThread() {
427   static thread_local T t;
428   return t;
429 }
430
431 inline void FiberManager::initLocalData(Fiber& fiber) {
432   auto fm = getFiberManagerUnsafe();
433   if (fm && fm->currentFiber_ && fm->localType_ == localType_) {
434     fiber.localData_ = fm->currentFiber_->localData_;
435   }
436 }
437
438 template <typename LocalT>
439 FiberManager::FiberManager(
440   LocalType<LocalT>,
441   std::unique_ptr<LoopController> loopController__,
442   Options options)  :
443     loopController_(std::move(loopController__)),
444     options_(std::move(options)),
445     exceptionCallback_([](std::exception_ptr eptr, std::string context) {
446         try {
447           std::rethrow_exception(eptr);
448         } catch (const std::exception& e) {
449           LOG(DFATAL) << "Exception " << typeid(e).name()
450                       << " with message '" << e.what() << "' was thrown in "
451                       << "FiberManager with context '" << context << "'";
452           throw;
453         } catch (...) {
454           LOG(DFATAL) << "Unknown exception was thrown in FiberManager with "
455                       << "context '" << context << "'";
456           throw;
457         }
458       }),
459     timeoutManager_(std::make_shared<TimeoutController>(*loopController_)),
460     localType_(typeid(LocalT)) {
461   loopController_->setFiberManager(this);
462 }
463
464 template <typename F>
465 typename FirstArgOf<F>::type::value_type
466 inline await(F&& func) {
467   typedef typename FirstArgOf<F>::type::value_type Result;
468
469   folly::Try<Result> result;
470
471   Baton baton;
472   baton.wait([&func, &result, &baton]() mutable {
473       func(Promise<Result>(result, baton));
474     });
475
476   return folly::moveFromTry(std::move(result));
477 }
478
479 }}