save/restore request context in future
[folly.git] / folly / wangle / detail / Core.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <mutex>
21 #include <stdexcept>
22 #include <vector>
23
24 #include <folly/Optional.h>
25 #include <folly/SmallLocks.h>
26
27 #include <folly/wangle/Try.h>
28 #include <folly/wangle/Promise.h>
29 #include <folly/wangle/Future.h>
30 #include <folly/wangle/Executor.h>
31 #include <folly/wangle/detail/FSM.h>
32
33 #include <folly/io/async/Request.h>
34
35 namespace folly { namespace wangle { namespace detail {
36
37 // As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers
38 // to functions, using a helper avoids a call to malloc.
39 template<typename T>
40 void empty_callback(Try<T>&&) { }
41
42 enum class State {
43   Waiting,
44   Interruptible,
45   Interrupted,
46   Done,
47 };
48
49 /** The shared state object for Future and Promise. */
50 template<typename T>
51 class Core : protected FSM<State> {
52  public:
53   // This must be heap-constructed. There's probably a way to enforce that in
54   // code but since this is just internal detail code and I don't know how
55   // off-hand, I'm punting.
56   Core() : FSM<State>(State::Waiting) {}
57   ~Core() {
58     assert(calledBack_);
59     assert(detached_ == 2);
60   }
61
62   // not copyable
63   Core(Core const&) = delete;
64   Core& operator=(Core const&) = delete;
65
66   // not movable (see comment in the implementation of Future::then)
67   Core(Core&&) noexcept = delete;
68   Core& operator=(Core&&) = delete;
69
70   Try<T>& getTry() {
71     if (ready()) {
72       return *result_;
73     } else {
74       throw FutureNotReady();
75     }
76   }
77
78   template <typename F>
79   void setCallback(F func) {
80     auto setCallback_ = [&]{
81       if (callback_) {
82         throw std::logic_error("setCallback called twice");
83       }
84
85       context_ = RequestContext::saveContext();
86       callback_ = std::move(func);
87     };
88
89     FSM_START
90       case State::Waiting:
91       case State::Interruptible:
92       case State::Interrupted:
93         FSM_UPDATE(state, setCallback_);
94         break;
95
96       case State::Done:
97         FSM_UPDATE2(State::Done,
98           setCallback_,
99           [&]{ maybeCallback(); });
100         break;
101     FSM_END
102   }
103
104   void setResult(Try<T>&& t) {
105     FSM_START
106       case State::Waiting:
107       case State::Interruptible:
108       case State::Interrupted:
109         FSM_UPDATE2(State::Done,
110           [&]{ result_ = std::move(t); },
111           [&]{ maybeCallback(); });
112         break;
113
114       case State::Done:
115         throw std::logic_error("setResult called twice");
116     FSM_END
117   }
118
119   bool ready() const {
120     return getState() == State::Done;
121   }
122
123   // Called by a destructing Future
124   void detachFuture() {
125     if (!callback_) {
126       setCallback(empty_callback<T>);
127     }
128     activate();
129     detachOne();
130   }
131
132   // Called by a destructing Promise
133   void detachPromise() {
134     if (!ready()) {
135       setResult(Try<T>(std::make_exception_ptr(BrokenPromise())));
136     }
137     detachOne();
138   }
139
140   void deactivate() {
141     active_ = false;
142   }
143
144   void activate() {
145     active_ = true;
146     if (ready()) {
147       maybeCallback();
148     }
149   }
150
151   bool isActive() { return active_; }
152
153   void setExecutor(Executor* x) {
154     executor_ = x;
155   }
156
157   void raise(std::exception_ptr const& e) {
158     FSM_START
159       case State::Interruptible:
160         FSM_UPDATE2(State::Interrupted,
161           [&]{ interrupt_ = e; },
162           [&]{ interruptHandler_(interrupt_); });
163         break;
164
165       case State::Waiting:
166       case State::Interrupted:
167         FSM_UPDATE(State::Interrupted,
168           [&]{ interrupt_ = e; });
169         break;
170
171       case State::Done:
172         FSM_BREAK
173     FSM_END
174   }
175
176   void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
177     FSM_START
178       case State::Waiting:
179       case State::Interruptible:
180         FSM_UPDATE(State::Interruptible,
181           [&]{ interruptHandler_ = std::move(fn); });
182         break;
183
184       case State::Interrupted:
185         fn(interrupt_);
186         FSM_BREAK
187
188       case State::Done:
189         FSM_BREAK
190     FSM_END
191   }
192
193  private:
194   void maybeCallback() {
195     assert(ready());
196     if (!calledBack_ && isActive() && callback_) {
197       // TODO(5306911) we should probably try/catch
198       calledBack_ = true;
199       Executor* x = executor_;
200
201       RequestContext::setContext(context_);
202       if (x) {
203         MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
204         MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
205         x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
206       } else {
207         callback_(std::move(*result_));
208       }
209     }
210   }
211
212   void detachOne() {
213     auto d = ++detached_;
214     assert(d >= 1);
215     assert(d <= 2);
216     if (d == 2) {
217       // we should have already executed the callback with the value
218       assert(calledBack_);
219       delete this;
220     }
221   }
222
223   folly::Optional<Try<T>> result_;
224   std::function<void(Try<T>&&)> callback_;
225   std::shared_ptr<RequestContext> context_{nullptr};
226   std::atomic<bool> calledBack_ {false};
227   std::atomic<unsigned char> detached_ {0};
228   std::atomic<bool> active_ {true};
229   std::atomic<Executor*> executor_ {nullptr};
230   std::exception_ptr interrupt_;
231   std::function<void(std::exception_ptr const&)> interruptHandler_;
232 };
233
234 template <typename... Ts>
235 struct VariadicContext {
236   VariadicContext() : total(0), count(0) {}
237   Promise<std::tuple<Try<Ts>... > > p;
238   std::tuple<Try<Ts>... > results;
239   size_t total;
240   std::atomic<size_t> count;
241   typedef Future<std::tuple<Try<Ts>...>> type;
242 };
243
244 template <typename... Ts, typename THead, typename... Fs>
245 typename std::enable_if<sizeof...(Fs) == 0, void>::type
246 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
247   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
248     std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
249     if (++ctx->count == ctx->total) {
250       ctx->p.setValue(std::move(ctx->results));
251       delete ctx;
252     }
253   });
254 }
255
256 template <typename... Ts, typename THead, typename... Fs>
257 typename std::enable_if<sizeof...(Fs) != 0, void>::type
258 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
259   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
260     std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
261     if (++ctx->count == ctx->total) {
262       ctx->p.setValue(std::move(ctx->results));
263       delete ctx;
264     }
265   });
266   // template tail-recursion
267   whenAllVariadicHelper(ctx, std::forward<Fs>(tail)...);
268 }
269
270 template <typename T>
271 struct WhenAllContext {
272   explicit WhenAllContext() : count(0), total(0) {}
273   Promise<std::vector<Try<T> > > p;
274   std::vector<Try<T> > results;
275   std::atomic<size_t> count;
276   size_t total;
277 };
278
279 template <typename T>
280 struct WhenAnyContext {
281   explicit WhenAnyContext(size_t n) : done(false), ref_count(n) {};
282   Promise<std::pair<size_t, Try<T>>> p;
283   std::atomic<bool> done;
284   std::atomic<size_t> ref_count;
285   void decref() {
286     if (--ref_count == 0) {
287       delete this;
288     }
289   }
290 };
291
292 template <typename T>
293 struct WhenAllLaterContext {
294   explicit WhenAllLaterContext() : count(0), total(0) {}
295   std::function<void(std::vector<Try<T>>&&)> fn;
296   std::vector<Try<T> > results;
297   std::atomic<size_t> count;
298   size_t total;
299 };
300
301 }}} // namespace