2017
[folly.git] / folly / futures / detail / Core.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <mutex>
21 #include <stdexcept>
22 #include <vector>
23
24 #include <folly/Executor.h>
25 #include <folly/Function.h>
26 #include <folly/MicroSpinLock.h>
27 #include <folly/Optional.h>
28 #include <folly/ScopeGuard.h>
29 #include <folly/Try.h>
30 #include <folly/futures/Future.h>
31 #include <folly/futures/Promise.h>
32 #include <folly/futures/detail/FSM.h>
33
34 #include <folly/io/async/Request.h>
35
36 namespace folly { namespace detail {
37
38 /*
39         OnlyCallback
40        /            \
41   Start              Armed - Done
42        \            /
43          OnlyResult
44
45 This state machine is fairly self-explanatory. The most important bit is
46 that the callback is only executed on the transition from Armed to Done,
47 and that transition can happen immediately after transitioning from Only*
48 to Armed, if it is active (the usual case).
49 */
50 enum class State : uint8_t {
51   Start,
52   OnlyResult,
53   OnlyCallback,
54   Armed,
55   Done,
56 };
57
58 /// The shared state object for Future and Promise.
59 /// Some methods must only be called by either the Future thread or the
60 /// Promise thread. The Future thread is the thread that currently "owns" the
61 /// Future and its callback-related operations, and the Promise thread is
62 /// likewise the thread that currently "owns" the Promise and its
63 /// result-related operations. Also, Futures own interruption, Promises own
64 /// interrupt handlers. Unfortunately, there are things that users can do to
65 /// break this, and we can't detect that. However if they follow move
66 /// semantics religiously wrt threading, they should be ok.
67 ///
68 /// It's worth pointing out that Futures and/or Promises can and usually will
69 /// migrate between threads, though this usually happens within the API code.
70 /// For example, an async operation will probably make a Promise, grab its
71 /// Future, then move the Promise into another thread that will eventually
72 /// fulfill it. With executors and via, this gets slightly more complicated at
73 /// first blush, but it's the same principle. In general, as long as the user
74 /// doesn't access a Future or Promise object from more than one thread at a
75 /// time there won't be any problems.
76 template<typename T>
77 class Core final {
78   static_assert(!std::is_void<T>::value,
79                 "void futures are not supported. Use Unit instead.");
80  public:
81   /// This must be heap-constructed. There's probably a way to enforce that in
82   /// code but since this is just internal detail code and I don't know how
83   /// off-hand, I'm punting.
84   Core() : result_(), fsm_(State::Start), attached_(2) {}
85
86   explicit Core(Try<T>&& t)
87     : result_(std::move(t)),
88       fsm_(State::OnlyResult),
89       attached_(1) {}
90
91   ~Core() {
92     DCHECK(attached_ == 0);
93   }
94
95   // not copyable
96   Core(Core const&) = delete;
97   Core& operator=(Core const&) = delete;
98
99   // not movable (see comment in the implementation of Future::then)
100   Core(Core&&) noexcept = delete;
101   Core& operator=(Core&&) = delete;
102
103   // Core is assumed to be convertible only if the type is convertible
104   // and the size is the same. This is a compromise for the complexity
105   // of having to make Core truly have a conversion constructor which
106   // would cause various other problems.
107   // If we made Core move constructible then we would need to update the
108   // Promise and Future with the location of the new Core. This is complex
109   // and may be inefficient.
110   // Core should only be modified so that for size(T) == size(U),
111   // sizeof(Core<T>) == size(Core<U>).
112   // This assumption is used as a proxy to make sure that
113   // the members of Core<T> and Core<U> line up so that we can use a
114   // reinterpret cast.
115   template <
116       class U,
117       typename = typename std::enable_if<std::is_convertible<U, T>::value &&
118                                          sizeof(U) == sizeof(T)>::type>
119   static Core<T>* convert(Core<U>* from) {
120     return reinterpret_cast<Core<T>*>(from);
121   }
122
123   /// May call from any thread
124   bool hasResult() const {
125     switch (fsm_.getState()) {
126       case State::OnlyResult:
127       case State::Armed:
128       case State::Done:
129         assert(!!result_);
130         return true;
131
132       default:
133         return false;
134     }
135   }
136
137   /// May call from any thread
138   bool ready() const {
139     return hasResult();
140   }
141
142   /// May call from any thread
143   Try<T>& getTry() {
144     if (ready()) {
145       return *result_;
146     } else {
147       throw FutureNotReady();
148     }
149   }
150
151   /// Call only from Future thread.
152   template <typename F>
153   void setCallback(F&& func) {
154     bool transitionToArmed = false;
155     auto setCallback_ = [&]{
156       context_ = RequestContext::saveContext();
157       callback_ = std::forward<F>(func);
158     };
159
160     FSM_START(fsm_)
161       case State::Start:
162         FSM_UPDATE(fsm_, State::OnlyCallback, setCallback_);
163         break;
164
165       case State::OnlyResult:
166         FSM_UPDATE(fsm_, State::Armed, setCallback_);
167         transitionToArmed = true;
168         break;
169
170       case State::OnlyCallback:
171       case State::Armed:
172       case State::Done:
173         throw std::logic_error("setCallback called twice");
174     FSM_END
175
176     // we could always call this, it is an optimization to only call it when
177     // it might be needed.
178     if (transitionToArmed) {
179       maybeCallback();
180     }
181   }
182
183   /// Call only from Promise thread
184   void setResult(Try<T>&& t) {
185     bool transitionToArmed = false;
186     auto setResult_ = [&]{ result_ = std::move(t); };
187     FSM_START(fsm_)
188       case State::Start:
189         FSM_UPDATE(fsm_, State::OnlyResult, setResult_);
190         break;
191
192       case State::OnlyCallback:
193         FSM_UPDATE(fsm_, State::Armed, setResult_);
194         transitionToArmed = true;
195         break;
196
197       case State::OnlyResult:
198       case State::Armed:
199       case State::Done:
200         throw std::logic_error("setResult called twice");
201     FSM_END
202
203     if (transitionToArmed) {
204       maybeCallback();
205     }
206   }
207
208   /// Called by a destructing Future (in the Future thread, by definition)
209   void detachFuture() {
210     activate();
211     detachOne();
212   }
213
214   /// Called by a destructing Promise (in the Promise thread, by definition)
215   void detachPromise() {
216     // detachPromise() and setResult() should never be called in parallel
217     // so we don't need to protect this.
218     if (UNLIKELY(!result_)) {
219       setResult(Try<T>(exception_wrapper(BrokenPromise(typeid(T).name()))));
220     }
221     detachOne();
222   }
223
224   /// May call from any thread
225   void deactivate() {
226     active_.store(false, std::memory_order_release);
227   }
228
229   /// May call from any thread
230   void activate() {
231     active_.store(true, std::memory_order_release);
232     maybeCallback();
233   }
234
235   /// May call from any thread
236   bool isActive() { return active_.load(std::memory_order_acquire); }
237
238   /// Call only from Future thread
239   void setExecutor(Executor* x, int8_t priority = Executor::MID_PRI) {
240     if (!executorLock_.try_lock()) {
241       executorLock_.lock();
242     }
243     executor_ = x;
244     priority_ = priority;
245     executorLock_.unlock();
246   }
247
248   void setExecutorNoLock(Executor* x, int8_t priority = Executor::MID_PRI) {
249     executor_ = x;
250     priority_ = priority;
251   }
252
253   Executor* getExecutor() {
254     return executor_;
255   }
256
257   /// Call only from Future thread
258   void raise(exception_wrapper e) {
259     if (!interruptLock_.try_lock()) {
260       interruptLock_.lock();
261     }
262     if (!interrupt_ && !hasResult()) {
263       interrupt_ = folly::make_unique<exception_wrapper>(std::move(e));
264       if (interruptHandler_) {
265         interruptHandler_(*interrupt_);
266       }
267     }
268     interruptLock_.unlock();
269   }
270
271   std::function<void(exception_wrapper const&)> getInterruptHandler() {
272     if (!interruptHandlerSet_.load(std::memory_order_acquire)) {
273       return nullptr;
274     }
275     if (!interruptLock_.try_lock()) {
276       interruptLock_.lock();
277     }
278     auto handler = interruptHandler_;
279     interruptLock_.unlock();
280     return handler;
281   }
282
283   /// Call only from Promise thread
284   void setInterruptHandler(std::function<void(exception_wrapper const&)> fn) {
285     if (!interruptLock_.try_lock()) {
286       interruptLock_.lock();
287     }
288     if (!hasResult()) {
289       if (interrupt_) {
290         fn(*interrupt_);
291       } else {
292         setInterruptHandlerNoLock(std::move(fn));
293       }
294     }
295     interruptLock_.unlock();
296   }
297
298   void setInterruptHandlerNoLock(
299       std::function<void(exception_wrapper const&)> fn) {
300     interruptHandlerSet_.store(true, std::memory_order_relaxed);
301     interruptHandler_ = std::move(fn);
302   }
303
304  private:
305   class CountedReference {
306    public:
307     ~CountedReference() {
308       if (core_) {
309         core_->detachOne();
310         core_ = nullptr;
311       }
312     }
313
314     explicit CountedReference(Core* core) noexcept : core_(core) {
315       // do not construct a CountedReference from nullptr!
316       DCHECK(core);
317
318       ++core_->attached_;
319     }
320
321     // CountedReference must be copy-constructable as long as
322     // folly::Executor::add takes a std::function
323     CountedReference(CountedReference const& o) noexcept : core_(o.core_) {
324       if (core_) {
325         ++core_->attached_;
326       }
327     }
328
329     CountedReference& operator=(CountedReference const& o) noexcept {
330       ~CountedReference();
331       new (this) CountedReference(o);
332       return *this;
333     }
334
335     CountedReference(CountedReference&& o) noexcept {
336       std::swap(core_, o.core_);
337     }
338
339     CountedReference& operator=(CountedReference&& o) noexcept {
340       ~CountedReference();
341       new (this) CountedReference(std::move(o));
342       return *this;
343     }
344
345     Core* getCore() const noexcept {
346       return core_;
347     }
348
349    private:
350     Core* core_{nullptr};
351   };
352
353   void maybeCallback() {
354     FSM_START(fsm_)
355       case State::Armed:
356         if (active_.load(std::memory_order_acquire)) {
357           FSM_UPDATE2(fsm_, State::Done, []{}, [this]{ this->doCallback(); });
358         }
359         FSM_BREAK
360
361       default:
362         FSM_BREAK
363     FSM_END
364   }
365
366   void doCallback() {
367     Executor* x = executor_;
368     int8_t priority;
369     if (x) {
370       if (!executorLock_.try_lock()) {
371         executorLock_.lock();
372       }
373       x = executor_;
374       priority = priority_;
375       executorLock_.unlock();
376     }
377
378     if (x) {
379       try {
380         if (LIKELY(x->getNumPriorities() == 1)) {
381           x->add([core_ref = CountedReference(this)]() mutable {
382             auto cr = std::move(core_ref);
383             Core* const core = cr.getCore();
384             RequestContextScopeGuard rctx(core->context_);
385             SCOPE_EXIT { core->callback_ = {}; };
386             core->callback_(std::move(*core->result_));
387           });
388         } else {
389           x->addWithPriority([core_ref = CountedReference(this)]() mutable {
390             auto cr = std::move(core_ref);
391             Core* const core = cr.getCore();
392             RequestContextScopeGuard rctx(core->context_);
393             SCOPE_EXIT { core->callback_ = {}; };
394             core->callback_(std::move(*core->result_));
395           }, priority);
396         }
397       } catch (...) {
398         CountedReference core_ref(this);
399         RequestContextScopeGuard rctx(context_);
400         result_ = Try<T>(exception_wrapper(std::current_exception()));
401         SCOPE_EXIT { callback_ = {}; };
402         callback_(std::move(*result_));
403       }
404     } else {
405       CountedReference core_ref(this);
406       RequestContextScopeGuard rctx(context_);
407       SCOPE_EXIT { callback_ = {}; };
408       callback_(std::move(*result_));
409     }
410   }
411
412   void detachOne() {
413     auto a = attached_--;
414     assert(a >= 1);
415     if (a == 1) {
416       delete this;
417     }
418   }
419
420   // Core should only be modified so that for size(T) == size(U),
421   // sizeof(Core<T>) == size(Core<U>).
422   // See Core::convert for details.
423
424   folly::Function<void(Try<T>&&)> callback_;
425   // place result_ next to increase the likelihood that the value will be
426   // contained entirely in one cache line
427   folly::Optional<Try<T>> result_;
428   FSM<State> fsm_;
429   std::atomic<unsigned char> attached_;
430   std::atomic<bool> active_ {true};
431   std::atomic<bool> interruptHandlerSet_ {false};
432   folly::MicroSpinLock interruptLock_ {0};
433   folly::MicroSpinLock executorLock_ {0};
434   int8_t priority_ {-1};
435   Executor* executor_ {nullptr};
436   std::shared_ptr<RequestContext> context_ {nullptr};
437   std::unique_ptr<exception_wrapper> interrupt_ {};
438   std::function<void(exception_wrapper const&)> interruptHandler_ {nullptr};
439 };
440
441 template <typename... Ts>
442 struct CollectAllVariadicContext {
443   CollectAllVariadicContext() {}
444   template <typename T, size_t I>
445   inline void setPartialResult(Try<T>& t) {
446     std::get<I>(results) = std::move(t);
447   }
448   ~CollectAllVariadicContext() {
449     p.setValue(std::move(results));
450   }
451   Promise<std::tuple<Try<Ts>...>> p;
452   std::tuple<Try<Ts>...> results;
453   typedef Future<std::tuple<Try<Ts>...>> type;
454 };
455
456 template <typename... Ts>
457 struct CollectVariadicContext {
458   CollectVariadicContext() {}
459   template <typename T, size_t I>
460   inline void setPartialResult(Try<T>& t) {
461     if (t.hasException()) {
462        if (!threw.exchange(true)) {
463          p.setException(std::move(t.exception()));
464        }
465      } else if (!threw) {
466        std::get<I>(results) = std::move(t);
467      }
468   }
469   ~CollectVariadicContext() noexcept {
470     if (!threw.exchange(true)) {
471       p.setValue(unwrapTryTuple(std::move(results)));
472     }
473   }
474   Promise<std::tuple<Ts...>> p;
475   std::tuple<folly::Try<Ts>...> results;
476   std::atomic<bool> threw {false};
477   typedef Future<std::tuple<Ts...>> type;
478 };
479
480 template <template <typename...> class T, typename... Ts>
481 void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& /* ctx */) {
482   // base case
483 }
484
485 template <template <typename ...> class T, typename... Ts,
486           typename THead, typename... TTail>
487 void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx,
488                            THead&& head, TTail&&... tail) {
489   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
490     ctx->template setPartialResult<typename THead::value_type,
491                                    sizeof...(Ts) - sizeof...(TTail) - 1>(t);
492   });
493   // template tail-recursion
494   collectVariadicHelper(ctx, std::forward<TTail>(tail)...);
495 }
496
497 }} // folly::detail