move wangle/futures to futures
[folly.git] / folly / futures / detail / Core.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <mutex>
21 #include <stdexcept>
22 #include <vector>
23
24 #include <folly/Optional.h>
25 #include <folly/SmallLocks.h>
26
27 #include <folly/futures/Try.h>
28 #include <folly/futures/Promise.h>
29 #include <folly/futures/Future.h>
30 #include <folly/Executor.h>
31 #include <folly/futures/detail/FSM.h>
32
33 #include <folly/io/async/Request.h>
34
35 namespace folly { namespace wangle { namespace detail {
36
37 // As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers
38 // to functions, using a helper avoids a call to malloc.
39 template<typename T>
40 void empty_callback(Try<T>&&) { }
41
42 enum class State {
43   Waiting,
44   Interruptible,
45   Interrupted,
46   Done,
47 };
48
49 /** The shared state object for Future and Promise. */
50 template<typename T>
51 class Core : protected FSM<State> {
52  public:
53   // This must be heap-constructed. There's probably a way to enforce that in
54   // code but since this is just internal detail code and I don't know how
55   // off-hand, I'm punting.
56   Core() : FSM<State>(State::Waiting) {}
57   ~Core() {
58     assert(calledBack_);
59     assert(detached_ == 2);
60   }
61
62   // not copyable
63   Core(Core const&) = delete;
64   Core& operator=(Core const&) = delete;
65
66   // not movable (see comment in the implementation of Future::then)
67   Core(Core&&) noexcept = delete;
68   Core& operator=(Core&&) = delete;
69
70   Try<T>& getTry() {
71     if (ready()) {
72       return *result_;
73     } else {
74       throw FutureNotReady();
75     }
76   }
77
78   template <typename F>
79   void setCallback(F func) {
80     auto setCallback_ = [&]{
81       if (callback_) {
82         throw std::logic_error("setCallback called twice");
83       }
84
85       context_ = RequestContext::saveContext();
86       callback_ = std::move(func);
87     };
88
89     FSM_START
90       case State::Waiting:
91       case State::Interruptible:
92       case State::Interrupted:
93         FSM_UPDATE(state, setCallback_);
94         break;
95
96       case State::Done:
97         FSM_UPDATE2(State::Done,
98           setCallback_,
99           [&]{ maybeCallback(); });
100         break;
101     FSM_END
102   }
103
104   void setResult(Try<T>&& t) {
105     FSM_START
106       case State::Waiting:
107       case State::Interruptible:
108       case State::Interrupted:
109         FSM_UPDATE2(State::Done,
110           [&]{ result_ = std::move(t); },
111           [&]{ maybeCallback(); });
112         break;
113
114       case State::Done:
115         throw std::logic_error("setResult called twice");
116     FSM_END
117   }
118
119   bool ready() const {
120     return getState() == State::Done;
121   }
122
123   // Called by a destructing Future
124   void detachFuture() {
125     if (!callback_) {
126       setCallback(empty_callback<T>);
127     }
128     activate();
129     detachOne();
130   }
131
132   // Called by a destructing Promise
133   void detachPromise() {
134     if (!ready()) {
135       setResult(Try<T>(exception_wrapper(BrokenPromise())));
136     }
137     detachOne();
138   }
139
140   void deactivate() {
141     active_ = false;
142   }
143
144   void activate() {
145     active_ = true;
146     if (ready()) {
147       maybeCallback();
148     }
149   }
150
151   bool isActive() { return active_; }
152
153   void setExecutor(Executor* x) {
154     executor_ = x;
155   }
156
157   void raise(exception_wrapper const& e) {
158     FSM_START
159       case State::Interruptible:
160         FSM_UPDATE2(State::Interrupted,
161           [&]{ interrupt_ = folly::make_unique<exception_wrapper>(e); },
162           [&]{ interruptHandler_(*interrupt_); });
163         break;
164
165       case State::Waiting:
166       case State::Interrupted:
167         FSM_UPDATE(State::Interrupted,
168           [&]{ interrupt_ = folly::make_unique<exception_wrapper>(e); });
169         break;
170
171       case State::Done:
172         FSM_BREAK
173     FSM_END
174   }
175
176   void setInterruptHandler(std::function<void(exception_wrapper const&)> fn) {
177     FSM_START
178       case State::Waiting:
179       case State::Interruptible:
180         FSM_UPDATE(State::Interruptible,
181           [&]{ interruptHandler_ = std::move(fn); });
182         break;
183
184       case State::Interrupted:
185         fn(*interrupt_);
186         FSM_BREAK
187
188       case State::Done:
189         FSM_BREAK
190     FSM_END
191   }
192
193  private:
194   void maybeCallback() {
195     assert(ready());
196     if (isActive() && callback_) {
197       if (!calledBack_.exchange(true)) {
198         // TODO(5306911) we should probably try/catch
199         Executor* x = executor_;
200
201         RequestContext::setContext(context_);
202         if (x) {
203           MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
204           MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
205           x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
206         } else {
207           callback_(std::move(*result_));
208         }
209       }
210     }
211   }
212
213   void detachOne() {
214     auto d = ++detached_;
215     assert(d >= 1);
216     assert(d <= 2);
217     if (d == 2) {
218       // we should have already executed the callback with the value
219       assert(calledBack_);
220       delete this;
221     }
222   }
223
224   folly::Optional<Try<T>> result_;
225   std::function<void(Try<T>&&)> callback_;
226   std::shared_ptr<RequestContext> context_{nullptr};
227   std::atomic<bool> calledBack_ {false};
228   std::atomic<unsigned char> detached_ {0};
229   std::atomic<bool> active_ {true};
230   std::atomic<Executor*> executor_ {nullptr};
231   std::unique_ptr<exception_wrapper> interrupt_;
232   std::function<void(exception_wrapper const&)> interruptHandler_;
233 };
234
235 template <typename... Ts>
236 struct VariadicContext {
237   VariadicContext() : total(0), count(0) {}
238   Promise<std::tuple<Try<Ts>... > > p;
239   std::tuple<Try<Ts>... > results;
240   size_t total;
241   std::atomic<size_t> count;
242   typedef Future<std::tuple<Try<Ts>...>> type;
243 };
244
245 template <typename... Ts, typename THead, typename... Fs>
246 typename std::enable_if<sizeof...(Fs) == 0, void>::type
247 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
248   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
249     std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
250     if (++ctx->count == ctx->total) {
251       ctx->p.setValue(std::move(ctx->results));
252       delete ctx;
253     }
254   });
255 }
256
257 template <typename... Ts, typename THead, typename... Fs>
258 typename std::enable_if<sizeof...(Fs) != 0, void>::type
259 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
260   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
261     std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
262     if (++ctx->count == ctx->total) {
263       ctx->p.setValue(std::move(ctx->results));
264       delete ctx;
265     }
266   });
267   // template tail-recursion
268   whenAllVariadicHelper(ctx, std::forward<Fs>(tail)...);
269 }
270
271 template <typename T>
272 struct WhenAllContext {
273   WhenAllContext() : count(0) {}
274   Promise<std::vector<Try<T> > > p;
275   std::vector<Try<T> > results;
276   std::atomic<size_t> count;
277 };
278
279 template <typename T>
280 struct WhenAnyContext {
281   explicit WhenAnyContext(size_t n) : done(false), ref_count(n) {};
282   Promise<std::pair<size_t, Try<T>>> p;
283   std::atomic<bool> done;
284   std::atomic<size_t> ref_count;
285   void decref() {
286     if (--ref_count == 0) {
287       delete this;
288     }
289   }
290 };
291
292 }}} // namespace