fix dead-lock in Future when executor discards function
[folly.git] / folly / futures / detail / Core.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <mutex>
21 #include <stdexcept>
22 #include <vector>
23
24 #include <folly/Executor.h>
25 #include <folly/Function.h>
26 #include <folly/MicroSpinLock.h>
27 #include <folly/Optional.h>
28 #include <folly/ScopeGuard.h>
29 #include <folly/Try.h>
30 #include <folly/futures/Future.h>
31 #include <folly/futures/Promise.h>
32 #include <folly/futures/detail/FSM.h>
33
34 #include <folly/io/async/Request.h>
35
36 namespace folly { namespace detail {
37
38 /*
39         OnlyCallback
40        /            \
41   Start              Armed - Done
42        \            /
43          OnlyResult
44
45 This state machine is fairly self-explanatory. The most important bit is
46 that the callback is only executed on the transition from Armed to Done,
47 and that transition can happen immediately after transitioning from Only*
48 to Armed, if it is active (the usual case).
49 */
50 enum class State : uint8_t {
51   Start,
52   OnlyResult,
53   OnlyCallback,
54   Armed,
55   Done,
56 };
57
58 /// The shared state object for Future and Promise.
59 /// Some methods must only be called by either the Future thread or the
60 /// Promise thread. The Future thread is the thread that currently "owns" the
61 /// Future and its callback-related operations, and the Promise thread is
62 /// likewise the thread that currently "owns" the Promise and its
63 /// result-related operations. Also, Futures own interruption, Promises own
64 /// interrupt handlers. Unfortunately, there are things that users can do to
65 /// break this, and we can't detect that. However if they follow move
66 /// semantics religiously wrt threading, they should be ok.
67 ///
68 /// It's worth pointing out that Futures and/or Promises can and usually will
69 /// migrate between threads, though this usually happens within the API code.
70 /// For example, an async operation will probably make a Promise, grab its
71 /// Future, then move the Promise into another thread that will eventually
72 /// fulfill it. With executors and via, this gets slightly more complicated at
73 /// first blush, but it's the same principle. In general, as long as the user
74 /// doesn't access a Future or Promise object from more than one thread at a
75 /// time there won't be any problems.
76 template<typename T>
77 class Core final {
78   static_assert(!std::is_void<T>::value,
79                 "void futures are not supported. Use Unit instead.");
80  public:
81   /// This must be heap-constructed. There's probably a way to enforce that in
82   /// code but since this is just internal detail code and I don't know how
83   /// off-hand, I'm punting.
84   Core() : result_(), fsm_(State::Start), attached_(2) {}
85
86   explicit Core(Try<T>&& t)
87     : result_(std::move(t)),
88       fsm_(State::OnlyResult),
89       attached_(1) {}
90
91   ~Core() {
92     DCHECK(attached_ == 0);
93   }
94
95   // not copyable
96   Core(Core const&) = delete;
97   Core& operator=(Core const&) = delete;
98
99   // not movable (see comment in the implementation of Future::then)
100   Core(Core&&) noexcept = delete;
101   Core& operator=(Core&&) = delete;
102
103   /// May call from any thread
104   bool hasResult() const {
105     switch (fsm_.getState()) {
106       case State::OnlyResult:
107       case State::Armed:
108       case State::Done:
109         assert(!!result_);
110         return true;
111
112       default:
113         return false;
114     }
115   }
116
117   /// May call from any thread
118   bool ready() const {
119     return hasResult();
120   }
121
122   /// May call from any thread
123   Try<T>& getTry() {
124     if (ready()) {
125       return *result_;
126     } else {
127       throw FutureNotReady();
128     }
129   }
130
131   /// Call only from Future thread.
132   template <typename F>
133   void setCallback(F&& func) {
134     bool transitionToArmed = false;
135     auto setCallback_ = [&]{
136       context_ = RequestContext::saveContext();
137       callback_ = std::forward<F>(func);
138     };
139
140     FSM_START(fsm_)
141       case State::Start:
142         FSM_UPDATE(fsm_, State::OnlyCallback, setCallback_);
143         break;
144
145       case State::OnlyResult:
146         FSM_UPDATE(fsm_, State::Armed, setCallback_);
147         transitionToArmed = true;
148         break;
149
150       case State::OnlyCallback:
151       case State::Armed:
152       case State::Done:
153         throw std::logic_error("setCallback called twice");
154     FSM_END
155
156     // we could always call this, it is an optimization to only call it when
157     // it might be needed.
158     if (transitionToArmed) {
159       maybeCallback();
160     }
161   }
162
163   /// Call only from Promise thread
164   void setResult(Try<T>&& t) {
165     bool transitionToArmed = false;
166     auto setResult_ = [&]{ result_ = std::move(t); };
167     FSM_START(fsm_)
168       case State::Start:
169         FSM_UPDATE(fsm_, State::OnlyResult, setResult_);
170         break;
171
172       case State::OnlyCallback:
173         FSM_UPDATE(fsm_, State::Armed, setResult_);
174         transitionToArmed = true;
175         break;
176
177       case State::OnlyResult:
178       case State::Armed:
179       case State::Done:
180         throw std::logic_error("setResult called twice");
181     FSM_END
182
183     if (transitionToArmed) {
184       maybeCallback();
185     }
186   }
187
188   /// Called by a destructing Future (in the Future thread, by definition)
189   void detachFuture() {
190     activate();
191     detachOne();
192   }
193
194   /// Called by a destructing Promise (in the Promise thread, by definition)
195   void detachPromise() {
196     // detachPromise() and setResult() should never be called in parallel
197     // so we don't need to protect this.
198     if (UNLIKELY(!result_)) {
199       setResult(Try<T>(exception_wrapper(BrokenPromise(typeid(T).name()))));
200     }
201     detachOne();
202   }
203
204   /// May call from any thread
205   void deactivate() {
206     active_.store(false, std::memory_order_release);
207   }
208
209   /// May call from any thread
210   void activate() {
211     active_.store(true, std::memory_order_release);
212     maybeCallback();
213   }
214
215   /// May call from any thread
216   bool isActive() { return active_.load(std::memory_order_acquire); }
217
218   /// Call only from Future thread
219   void setExecutor(Executor* x, int8_t priority = Executor::MID_PRI) {
220     if (!executorLock_.try_lock()) {
221       executorLock_.lock();
222     }
223     executor_ = x;
224     priority_ = priority;
225     executorLock_.unlock();
226   }
227
228   void setExecutorNoLock(Executor* x, int8_t priority = Executor::MID_PRI) {
229     executor_ = x;
230     priority_ = priority;
231   }
232
233   Executor* getExecutor() {
234     return executor_;
235   }
236
237   /// Call only from Future thread
238   void raise(exception_wrapper e) {
239     if (!interruptLock_.try_lock()) {
240       interruptLock_.lock();
241     }
242     if (!interrupt_ && !hasResult()) {
243       interrupt_ = folly::make_unique<exception_wrapper>(std::move(e));
244       if (interruptHandler_) {
245         interruptHandler_(*interrupt_);
246       }
247     }
248     interruptLock_.unlock();
249   }
250
251   std::function<void(exception_wrapper const&)> getInterruptHandler() {
252     if (!interruptHandlerSet_.load(std::memory_order_acquire)) {
253       return nullptr;
254     }
255     if (!interruptLock_.try_lock()) {
256       interruptLock_.lock();
257     }
258     auto handler = interruptHandler_;
259     interruptLock_.unlock();
260     return handler;
261   }
262
263   /// Call only from Promise thread
264   void setInterruptHandler(std::function<void(exception_wrapper const&)> fn) {
265     if (!interruptLock_.try_lock()) {
266       interruptLock_.lock();
267     }
268     if (!hasResult()) {
269       if (interrupt_) {
270         fn(*interrupt_);
271       } else {
272         setInterruptHandlerNoLock(std::move(fn));
273       }
274     }
275     interruptLock_.unlock();
276   }
277
278   void setInterruptHandlerNoLock(
279       std::function<void(exception_wrapper const&)> fn) {
280     interruptHandlerSet_.store(true, std::memory_order_relaxed);
281     interruptHandler_ = std::move(fn);
282   }
283
284  private:
285   // Helper class that stores a pointer to the `Core` object and calls
286   // `derefCallback` and `detachOne` in the destructor.
287   class CoreAndCallbackReference {
288    public:
289     explicit CoreAndCallbackReference(Core* core) noexcept : core_(core) {}
290
291     ~CoreAndCallbackReference() {
292       if (core_) {
293         core_->derefCallback();
294         core_->detachOne();
295       }
296     }
297
298     CoreAndCallbackReference(CoreAndCallbackReference const& o) = delete;
299     CoreAndCallbackReference& operator=(CoreAndCallbackReference const& o) =
300         delete;
301
302     CoreAndCallbackReference(CoreAndCallbackReference&& o) noexcept {
303       std::swap(core_, o.core_);
304     }
305
306     Core* getCore() const noexcept {
307       return core_;
308     }
309
310    private:
311     Core* core_{nullptr};
312   };
313
314   void maybeCallback() {
315     FSM_START(fsm_)
316       case State::Armed:
317         if (active_.load(std::memory_order_acquire)) {
318           FSM_UPDATE2(fsm_, State::Done, []{}, [this]{ this->doCallback(); });
319         }
320         FSM_BREAK
321
322       default:
323         FSM_BREAK
324     FSM_END
325   }
326
327   void doCallback() {
328     Executor* x = executor_;
329     int8_t priority;
330     if (x) {
331       if (!executorLock_.try_lock()) {
332         executorLock_.lock();
333       }
334       x = executor_;
335       priority = priority_;
336       executorLock_.unlock();
337     }
338
339     if (x) {
340       exception_wrapper ew;
341       // We need to reset `callback_` after it was executed (which can happen
342       // through the executor or, if `Executor::add` throws, below). The
343       // executor might discard the function without executing it (now or
344       // later), in which case `callback_` also needs to be reset.
345       // The `Core` has to be kept alive throughout that time, too. Hence we
346       // increment `attached_` and `callbackReferences_` by two, and construct
347       // exactly two `CoreAndCallbackReference` objects, which call
348       // `derefCallback` and `detachOne` in their destructor. One will guard
349       // this scope, the other one will guard the lambda passed to the executor.
350       attached_ += 2;
351       callbackReferences_ += 2;
352       CoreAndCallbackReference guard_local_scope(this);
353       CoreAndCallbackReference guard_lambda(this);
354       try {
355         if (LIKELY(x->getNumPriorities() == 1)) {
356           x->add([core_ref = std::move(guard_lambda)]() mutable {
357             auto cr = std::move(core_ref);
358             Core* const core = cr.getCore();
359             RequestContextScopeGuard rctx(core->context_);
360             core->callback_(std::move(*core->result_));
361           });
362         } else {
363           x->addWithPriority(
364               [core_ref = std::move(guard_lambda)]() mutable {
365                 auto cr = std::move(core_ref);
366                 Core* const core = cr.getCore();
367                 RequestContextScopeGuard rctx(core->context_);
368                 core->callback_(std::move(*core->result_));
369               },
370               priority);
371         }
372       } catch (const std::exception& e) {
373         ew = exception_wrapper(std::current_exception(), e);
374       } catch (...) {
375         ew = exception_wrapper(std::current_exception());
376       }
377       if (ew) {
378         RequestContextScopeGuard rctx(context_);
379         result_ = Try<T>(std::move(ew));
380         callback_(std::move(*result_));
381       }
382     } else {
383       attached_++;
384       SCOPE_EXIT {
385         callback_ = {};
386         detachOne();
387       };
388       RequestContextScopeGuard rctx(context_);
389       callback_(std::move(*result_));
390     }
391   }
392
393   void detachOne() {
394     auto a = attached_--;
395     assert(a >= 1);
396     if (a == 1) {
397       delete this;
398     }
399   }
400
401   void derefCallback() {
402     if (--callbackReferences_ == 0) {
403       callback_ = {};
404     }
405   }
406
407   folly::Function<void(Try<T>&&)> callback_;
408   // place result_ next to increase the likelihood that the value will be
409   // contained entirely in one cache line
410   folly::Optional<Try<T>> result_;
411   FSM<State> fsm_;
412   std::atomic<unsigned char> attached_;
413   std::atomic<unsigned char> callbackReferences_{0};
414   std::atomic<bool> active_ {true};
415   std::atomic<bool> interruptHandlerSet_ {false};
416   folly::MicroSpinLock interruptLock_ {0};
417   folly::MicroSpinLock executorLock_ {0};
418   int8_t priority_ {-1};
419   Executor* executor_ {nullptr};
420   std::shared_ptr<RequestContext> context_ {nullptr};
421   std::unique_ptr<exception_wrapper> interrupt_ {};
422   std::function<void(exception_wrapper const&)> interruptHandler_ {nullptr};
423 };
424
425 template <typename... Ts>
426 struct CollectAllVariadicContext {
427   CollectAllVariadicContext() {}
428   template <typename T, size_t I>
429   inline void setPartialResult(Try<T>& t) {
430     std::get<I>(results) = std::move(t);
431   }
432   ~CollectAllVariadicContext() {
433     p.setValue(std::move(results));
434   }
435   Promise<std::tuple<Try<Ts>...>> p;
436   std::tuple<Try<Ts>...> results;
437   typedef Future<std::tuple<Try<Ts>...>> type;
438 };
439
440 template <typename... Ts>
441 struct CollectVariadicContext {
442   CollectVariadicContext() {}
443   template <typename T, size_t I>
444   inline void setPartialResult(Try<T>& t) {
445     if (t.hasException()) {
446        if (!threw.exchange(true)) {
447          p.setException(std::move(t.exception()));
448        }
449      } else if (!threw) {
450        std::get<I>(results) = std::move(t);
451      }
452   }
453   ~CollectVariadicContext() noexcept {
454     if (!threw.exchange(true)) {
455       p.setValue(unwrapTryTuple(std::move(results)));
456     }
457   }
458   Promise<std::tuple<Ts...>> p;
459   std::tuple<folly::Try<Ts>...> results;
460   std::atomic<bool> threw {false};
461   typedef Future<std::tuple<Ts...>> type;
462 };
463
464 template <template <typename...> class T, typename... Ts>
465 void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& /* ctx */) {
466   // base case
467 }
468
469 template <template <typename ...> class T, typename... Ts,
470           typename THead, typename... TTail>
471 void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx,
472                            THead&& head, TTail&&... tail) {
473   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
474     ctx->template setPartialResult<typename THead::value_type,
475                                    sizeof...(Ts) - sizeof...(TTail) - 1>(t);
476   });
477   // template tail-recursion
478   collectVariadicHelper(ctx, std::forward<TTail>(tail)...);
479 }
480
481 }} // folly::detail