capture exception information when creating exception_wrapper
[folly.git] / folly / futures / detail / Core.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <mutex>
21 #include <stdexcept>
22 #include <vector>
23
24 #include <folly/Executor.h>
25 #include <folly/Function.h>
26 #include <folly/MicroSpinLock.h>
27 #include <folly/Optional.h>
28 #include <folly/ScopeGuard.h>
29 #include <folly/Try.h>
30 #include <folly/futures/Future.h>
31 #include <folly/futures/Promise.h>
32 #include <folly/futures/detail/FSM.h>
33
34 #include <folly/io/async/Request.h>
35
36 namespace folly { namespace detail {
37
38 /*
39         OnlyCallback
40        /            \
41   Start              Armed - Done
42        \            /
43          OnlyResult
44
45 This state machine is fairly self-explanatory. The most important bit is
46 that the callback is only executed on the transition from Armed to Done,
47 and that transition can happen immediately after transitioning from Only*
48 to Armed, if it is active (the usual case).
49 */
50 enum class State : uint8_t {
51   Start,
52   OnlyResult,
53   OnlyCallback,
54   Armed,
55   Done,
56 };
57
58 /// The shared state object for Future and Promise.
59 /// Some methods must only be called by either the Future thread or the
60 /// Promise thread. The Future thread is the thread that currently "owns" the
61 /// Future and its callback-related operations, and the Promise thread is
62 /// likewise the thread that currently "owns" the Promise and its
63 /// result-related operations. Also, Futures own interruption, Promises own
64 /// interrupt handlers. Unfortunately, there are things that users can do to
65 /// break this, and we can't detect that. However if they follow move
66 /// semantics religiously wrt threading, they should be ok.
67 ///
68 /// It's worth pointing out that Futures and/or Promises can and usually will
69 /// migrate between threads, though this usually happens within the API code.
70 /// For example, an async operation will probably make a Promise, grab its
71 /// Future, then move the Promise into another thread that will eventually
72 /// fulfill it. With executors and via, this gets slightly more complicated at
73 /// first blush, but it's the same principle. In general, as long as the user
74 /// doesn't access a Future or Promise object from more than one thread at a
75 /// time there won't be any problems.
76 template<typename T>
77 class Core final {
78   static_assert(!std::is_void<T>::value,
79                 "void futures are not supported. Use Unit instead.");
80  public:
81   /// This must be heap-constructed. There's probably a way to enforce that in
82   /// code but since this is just internal detail code and I don't know how
83   /// off-hand, I'm punting.
84   Core() : result_(), fsm_(State::Start), attached_(2) {}
85
86   explicit Core(Try<T>&& t)
87     : result_(std::move(t)),
88       fsm_(State::OnlyResult),
89       attached_(1) {}
90
91   ~Core() {
92     DCHECK(attached_ == 0);
93   }
94
95   // not copyable
96   Core(Core const&) = delete;
97   Core& operator=(Core const&) = delete;
98
99   // not movable (see comment in the implementation of Future::then)
100   Core(Core&&) noexcept = delete;
101   Core& operator=(Core&&) = delete;
102
103   /// May call from any thread
104   bool hasResult() const {
105     switch (fsm_.getState()) {
106       case State::OnlyResult:
107       case State::Armed:
108       case State::Done:
109         assert(!!result_);
110         return true;
111
112       default:
113         return false;
114     }
115   }
116
117   /// May call from any thread
118   bool ready() const {
119     return hasResult();
120   }
121
122   /// May call from any thread
123   Try<T>& getTry() {
124     if (ready()) {
125       return *result_;
126     } else {
127       throw FutureNotReady();
128     }
129   }
130
131   /// Call only from Future thread.
132   template <typename F>
133   void setCallback(F&& func) {
134     bool transitionToArmed = false;
135     auto setCallback_ = [&]{
136       context_ = RequestContext::saveContext();
137       callback_ = std::forward<F>(func);
138     };
139
140     FSM_START(fsm_)
141       case State::Start:
142         FSM_UPDATE(fsm_, State::OnlyCallback, setCallback_);
143         break;
144
145       case State::OnlyResult:
146         FSM_UPDATE(fsm_, State::Armed, setCallback_);
147         transitionToArmed = true;
148         break;
149
150       case State::OnlyCallback:
151       case State::Armed:
152       case State::Done:
153         throw std::logic_error("setCallback called twice");
154     FSM_END
155
156     // we could always call this, it is an optimization to only call it when
157     // it might be needed.
158     if (transitionToArmed) {
159       maybeCallback();
160     }
161   }
162
163   /// Call only from Promise thread
164   void setResult(Try<T>&& t) {
165     bool transitionToArmed = false;
166     auto setResult_ = [&]{ result_ = std::move(t); };
167     FSM_START(fsm_)
168       case State::Start:
169         FSM_UPDATE(fsm_, State::OnlyResult, setResult_);
170         break;
171
172       case State::OnlyCallback:
173         FSM_UPDATE(fsm_, State::Armed, setResult_);
174         transitionToArmed = true;
175         break;
176
177       case State::OnlyResult:
178       case State::Armed:
179       case State::Done:
180         throw std::logic_error("setResult called twice");
181     FSM_END
182
183     if (transitionToArmed) {
184       maybeCallback();
185     }
186   }
187
188   /// Called by a destructing Future (in the Future thread, by definition)
189   void detachFuture() {
190     activate();
191     detachOne();
192   }
193
194   /// Called by a destructing Promise (in the Promise thread, by definition)
195   void detachPromise() {
196     // detachPromise() and setResult() should never be called in parallel
197     // so we don't need to protect this.
198     if (UNLIKELY(!result_)) {
199       setResult(Try<T>(exception_wrapper(BrokenPromise(typeid(T).name()))));
200     }
201     detachOne();
202   }
203
204   /// May call from any thread
205   void deactivate() {
206     active_.store(false, std::memory_order_release);
207   }
208
209   /// May call from any thread
210   void activate() {
211     active_.store(true, std::memory_order_release);
212     maybeCallback();
213   }
214
215   /// May call from any thread
216   bool isActive() { return active_.load(std::memory_order_acquire); }
217
218   /// Call only from Future thread
219   void setExecutor(Executor* x, int8_t priority = Executor::MID_PRI) {
220     if (!executorLock_.try_lock()) {
221       executorLock_.lock();
222     }
223     executor_ = x;
224     priority_ = priority;
225     executorLock_.unlock();
226   }
227
228   void setExecutorNoLock(Executor* x, int8_t priority = Executor::MID_PRI) {
229     executor_ = x;
230     priority_ = priority;
231   }
232
233   Executor* getExecutor() {
234     return executor_;
235   }
236
237   /// Call only from Future thread
238   void raise(exception_wrapper e) {
239     if (!interruptLock_.try_lock()) {
240       interruptLock_.lock();
241     }
242     if (!interrupt_ && !hasResult()) {
243       interrupt_ = folly::make_unique<exception_wrapper>(std::move(e));
244       if (interruptHandler_) {
245         interruptHandler_(*interrupt_);
246       }
247     }
248     interruptLock_.unlock();
249   }
250
251   std::function<void(exception_wrapper const&)> getInterruptHandler() {
252     if (!interruptHandlerSet_.load(std::memory_order_acquire)) {
253       return nullptr;
254     }
255     if (!interruptLock_.try_lock()) {
256       interruptLock_.lock();
257     }
258     auto handler = interruptHandler_;
259     interruptLock_.unlock();
260     return handler;
261   }
262
263   /// Call only from Promise thread
264   void setInterruptHandler(std::function<void(exception_wrapper const&)> fn) {
265     if (!interruptLock_.try_lock()) {
266       interruptLock_.lock();
267     }
268     if (!hasResult()) {
269       if (interrupt_) {
270         fn(*interrupt_);
271       } else {
272         setInterruptHandlerNoLock(std::move(fn));
273       }
274     }
275     interruptLock_.unlock();
276   }
277
278   void setInterruptHandlerNoLock(
279       std::function<void(exception_wrapper const&)> fn) {
280     interruptHandlerSet_.store(true, std::memory_order_relaxed);
281     interruptHandler_ = std::move(fn);
282   }
283
284  private:
285   class CountedReference {
286    public:
287     ~CountedReference() {
288       if (core_) {
289         core_->detachOne();
290         core_ = nullptr;
291       }
292     }
293
294     explicit CountedReference(Core* core) noexcept : core_(core) {
295       // do not construct a CountedReference from nullptr!
296       DCHECK(core);
297
298       ++core_->attached_;
299     }
300
301     // CountedReference must be copy-constructable as long as
302     // folly::Executor::add takes a std::function
303     CountedReference(CountedReference const& o) noexcept : core_(o.core_) {
304       if (core_) {
305         ++core_->attached_;
306       }
307     }
308
309     CountedReference& operator=(CountedReference const& o) noexcept {
310       ~CountedReference();
311       new (this) CountedReference(o);
312       return *this;
313     }
314
315     CountedReference(CountedReference&& o) noexcept {
316       std::swap(core_, o.core_);
317     }
318
319     CountedReference& operator=(CountedReference&& o) noexcept {
320       ~CountedReference();
321       new (this) CountedReference(std::move(o));
322       return *this;
323     }
324
325     Core* getCore() const noexcept {
326       return core_;
327     }
328
329    private:
330     Core* core_{nullptr};
331   };
332
333   void maybeCallback() {
334     FSM_START(fsm_)
335       case State::Armed:
336         if (active_.load(std::memory_order_acquire)) {
337           FSM_UPDATE2(fsm_, State::Done, []{}, [this]{ this->doCallback(); });
338         }
339         FSM_BREAK
340
341       default:
342         FSM_BREAK
343     FSM_END
344   }
345
346   void doCallback() {
347     Executor* x = executor_;
348     int8_t priority;
349     if (x) {
350       if (!executorLock_.try_lock()) {
351         executorLock_.lock();
352       }
353       x = executor_;
354       priority = priority_;
355       executorLock_.unlock();
356     }
357
358     if (x) {
359       exception_wrapper ew;
360       try {
361         if (LIKELY(x->getNumPriorities() == 1)) {
362           x->add([core_ref = CountedReference(this)]() mutable {
363             auto cr = std::move(core_ref);
364             Core* const core = cr.getCore();
365             RequestContextScopeGuard rctx(core->context_);
366             SCOPE_EXIT { core->callback_ = {}; };
367             core->callback_(std::move(*core->result_));
368           });
369         } else {
370           x->addWithPriority([core_ref = CountedReference(this)]() mutable {
371             auto cr = std::move(core_ref);
372             Core* const core = cr.getCore();
373             RequestContextScopeGuard rctx(core->context_);
374             SCOPE_EXIT { core->callback_ = {}; };
375             core->callback_(std::move(*core->result_));
376           }, priority);
377         }
378       } catch (const std::exception& e) {
379         ew = exception_wrapper(std::current_exception(), e);
380       } catch (...) {
381         ew = exception_wrapper(std::current_exception());
382       }
383       if (ew) {
384         CountedReference core_ref(this);
385         RequestContextScopeGuard rctx(context_);
386         result_ = Try<T>(std::move(ew));
387         SCOPE_EXIT { callback_ = {}; };
388         callback_(std::move(*result_));
389       }
390     } else {
391       CountedReference core_ref(this);
392       RequestContextScopeGuard rctx(context_);
393       SCOPE_EXIT { callback_ = {}; };
394       callback_(std::move(*result_));
395     }
396   }
397
398   void detachOne() {
399     auto a = attached_--;
400     assert(a >= 1);
401     if (a == 1) {
402       delete this;
403     }
404   }
405
406
407   folly::Function<void(Try<T>&&)> callback_;
408   // place result_ next to increase the likelihood that the value will be
409   // contained entirely in one cache line
410   folly::Optional<Try<T>> result_;
411   FSM<State> fsm_;
412   std::atomic<unsigned char> attached_;
413   std::atomic<bool> active_ {true};
414   std::atomic<bool> interruptHandlerSet_ {false};
415   folly::MicroSpinLock interruptLock_ {0};
416   folly::MicroSpinLock executorLock_ {0};
417   int8_t priority_ {-1};
418   Executor* executor_ {nullptr};
419   std::shared_ptr<RequestContext> context_ {nullptr};
420   std::unique_ptr<exception_wrapper> interrupt_ {};
421   std::function<void(exception_wrapper const&)> interruptHandler_ {nullptr};
422 };
423
424 template <typename... Ts>
425 struct CollectAllVariadicContext {
426   CollectAllVariadicContext() {}
427   template <typename T, size_t I>
428   inline void setPartialResult(Try<T>& t) {
429     std::get<I>(results) = std::move(t);
430   }
431   ~CollectAllVariadicContext() {
432     p.setValue(std::move(results));
433   }
434   Promise<std::tuple<Try<Ts>...>> p;
435   std::tuple<Try<Ts>...> results;
436   typedef Future<std::tuple<Try<Ts>...>> type;
437 };
438
439 template <typename... Ts>
440 struct CollectVariadicContext {
441   CollectVariadicContext() {}
442   template <typename T, size_t I>
443   inline void setPartialResult(Try<T>& t) {
444     if (t.hasException()) {
445        if (!threw.exchange(true)) {
446          p.setException(std::move(t.exception()));
447        }
448      } else if (!threw) {
449        std::get<I>(results) = std::move(t);
450      }
451   }
452   ~CollectVariadicContext() noexcept {
453     if (!threw.exchange(true)) {
454       p.setValue(unwrapTryTuple(std::move(results)));
455     }
456   }
457   Promise<std::tuple<Ts...>> p;
458   std::tuple<folly::Try<Ts>...> results;
459   std::atomic<bool> threw {false};
460   typedef Future<std::tuple<Ts...>> type;
461 };
462
463 template <template <typename...> class T, typename... Ts>
464 void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& /* ctx */) {
465   // base case
466 }
467
468 template <template <typename ...> class T, typename... Ts,
469           typename THead, typename... TTail>
470 void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx,
471                            THead&& head, TTail&&... tail) {
472   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
473     ctx->template setPartialResult<typename THead::value_type,
474                                    sizeof...(Ts) - sizeof...(TTail) - 1>(t);
475   });
476   // template tail-recursion
477   collectVariadicHelper(ctx, std::forward<TTail>(tail)...);
478 }
479
480 }} // folly::detail