9a0fe9974ad74652156f2a2ffc4768b3a0ea50f3
[folly.git] / folly / wangle / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/wangle/detail/State.h>
23 #include <folly/Baton.h>
24
25 namespace folly { namespace wangle {
26
27 template <typename T>
28 struct isFuture {
29   static const bool value = false;
30 };
31
32 template <typename T>
33 struct isFuture<Future<T> > {
34   static const bool value = true;
35 };
36
37 template <class T>
38 Future<T>::Future(Future<T>&& other) noexcept : state_(nullptr) {
39   *this = std::move(other);
40 }
41
42 template <class T>
43 Future<T>& Future<T>::operator=(Future<T>&& other) {
44   std::swap(state_, other.state_);
45   return *this;
46 }
47
48 template <class T>
49 Future<T>::~Future() {
50   detach();
51 }
52
53 template <class T>
54 void Future<T>::detach() {
55   if (state_) {
56     state_->detachFuture();
57     state_ = nullptr;
58   }
59 }
60
61 template <class T>
62 void Future<T>::throwIfInvalid() const {
63   if (!state_)
64     throw NoState();
65 }
66
67 template <class T>
68 template <class F>
69 void Future<T>::setCallback_(F&& func) {
70   throwIfInvalid();
71   state_->setCallback(std::move(func));
72 }
73
74 template <class T>
75 template <class F>
76 typename std::enable_if<
77   !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
78   Future<typename std::result_of<F(Try<T>&&)>::type> >::type
79 Future<T>::then(F&& func) {
80   typedef typename std::result_of<F(Try<T>&&)>::type B;
81
82   throwIfInvalid();
83
84   // wrap these so we can move them into the lambda
85   folly::MoveWrapper<Promise<B>> p;
86   folly::MoveWrapper<F> funcm(std::forward<F>(func));
87
88   // grab the Future now before we lose our handle on the Promise
89   auto f = p->getFuture();
90
91   /* This is a bit tricky.
92
93      We can't just close over *this in case this Future gets moved. So we
94      make a new dummy Future. We could figure out something more
95      sophisticated that avoids making a new Future object when it can, as an
96      optimization. But this is correct.
97
98      state_ can't be moved, it is explicitly disallowed (as is copying). But
99      if there's ever a reason to allow it, this is one place that makes that
100      assumption and would need to be fixed. We use a standard shared pointer
101      for state_ (by copying it in), which means in essence obj holds a shared
102      pointer to itself.  But this shouldn't leak because Promise will not
103      outlive the continuation, because Promise will setException() with a
104      broken Promise if it is destructed before completed. We could use a
105      weak pointer but it would have to be converted to a shared pointer when
106      func is executed (because the Future returned by func may possibly
107      persist beyond the callback, if it gets moved), and so it is an
108      optimization to just make it shared from the get-go.
109
110      We have to move in the Promise and func using the MoveWrapper
111      hack. (func could be copied but it's a big drag on perf).
112
113      Two subtle but important points about this design. detail::State has no
114      back pointers to Future or Promise, so if Future or Promise get moved
115      (and they will be moved in performant code) we don't have to do
116      anything fancy. And because we store the continuation in the
117      detail::State, not in the Future, we can execute the continuation even
118      after the Future has gone out of scope. This is an intentional design
119      decision. It is likely we will want to be able to cancel a continuation
120      in some circumstances, but I think it should be explicit not implicit
121      in the destruction of the Future used to create it.
122      */
123   setCallback_(
124     [p, funcm](Try<T>&& t) mutable {
125       p->fulfil([&]() {
126           return (*funcm)(std::move(t));
127         });
128     });
129
130   return std::move(f);
131 }
132
133 template <class T>
134 template <class F>
135 typename std::enable_if<
136   isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
137   Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
138 Future<T>::then(F&& func) {
139   typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
140
141   throwIfInvalid();
142
143   // wrap these so we can move them into the lambda
144   folly::MoveWrapper<Promise<B>> p;
145   folly::MoveWrapper<F> funcm(std::forward<F>(func));
146
147   // grab the Future now before we lose our handle on the Promise
148   auto f = p->getFuture();
149
150   setCallback_(
151     [p, funcm](Try<T>&& t) mutable {
152       try {
153         auto f2 = (*funcm)(std::move(t));
154         // that didn't throw, now we can steal p
155         f2.setCallback_([p](Try<B>&& b) mutable {
156             p->fulfilTry(std::move(b));
157           });
158       } catch (...) {
159         p->setException(std::current_exception());
160       }
161     });
162
163   return std::move(f);
164 }
165
166 template <class T>
167 Future<void> Future<T>::then() {
168   return then([] (Try<T>&& t) {});
169 }
170
171 template <class T>
172 typename std::add_lvalue_reference<T>::type Future<T>::value() {
173   throwIfInvalid();
174
175   return state_->value();
176 }
177
178 template <class T>
179 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
180   throwIfInvalid();
181
182   return state_->value();
183 }
184
185 template <class T>
186 Try<T>& Future<T>::getTry() {
187   throwIfInvalid();
188
189   return state_->getTry();
190 }
191
192 template <class T>
193 template <typename Executor>
194 inline Future<T> Future<T>::via(Executor* executor) {
195   throwIfInvalid();
196   auto f = then([=](Try<T>&& t) {
197     MoveWrapper<Promise<T>> promise;
198     MoveWrapper<Try<T>> tw(std::move(t));
199     auto f2 = promise->getFuture();
200     executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); });
201     return f2;
202   });
203   f.deactivate();
204   return f;
205 }
206
207 template <class T>
208 bool Future<T>::isReady() const {
209   throwIfInvalid();
210   return state_->ready();
211 }
212
213 // makeFuture
214
215 template <class T>
216 Future<typename std::decay<T>::type> makeFuture(T&& t) {
217   Promise<typename std::decay<T>::type> p;
218   auto f = p.getFuture();
219   p.setValue(std::forward<T>(t));
220   return std::move(f);
221 }
222
223 inline // for multiple translation units
224 Future<void> makeFuture() {
225   Promise<void> p;
226   auto f = p.getFuture();
227   p.setValue();
228   return std::move(f);
229 }
230
231 template <class F>
232 auto makeFutureTry(
233     F&& func,
234     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
235     -> Future<decltype(func())> {
236   Promise<decltype(func())> p;
237   auto f = p.getFuture();
238   p.fulfil(
239     [&func]() {
240       return (func)();
241     });
242   return std::move(f);
243 }
244
245 template <class F>
246 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
247   F copy = func;
248   return makeFutureTry(std::move(copy));
249 }
250
251 template <class T>
252 Future<T> makeFuture(std::exception_ptr const& e) {
253   Promise<T> p;
254   auto f = p.getFuture();
255   p.setException(e);
256   return std::move(f);
257 }
258
259 template <class T, class E>
260 typename std::enable_if<std::is_base_of<std::exception, E>::value,
261                         Future<T>>::type
262 makeFuture(E const& e) {
263   Promise<T> p;
264   auto f = p.getFuture();
265   p.fulfil([&]() -> T { throw e; });
266   return std::move(f);
267 }
268
269 template <class T>
270 Future<T> makeFuture(Try<T>&& t) {
271   try {
272     return makeFuture<T>(std::move(t.value()));
273   } catch (...) {
274     return makeFuture<T>(std::current_exception());
275   }
276 }
277
278 template <>
279 inline Future<void> makeFuture(Try<void>&& t) {
280   try {
281     t.throwIfFailed();
282     return makeFuture();
283   } catch (...) {
284     return makeFuture<void>(std::current_exception());
285   }
286 }
287
288 // when (variadic)
289
290 template <typename... Fs>
291 typename detail::VariadicContext<
292   typename std::decay<Fs>::type::value_type...>::type
293 whenAll(Fs&&... fs)
294 {
295   auto ctx =
296     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
297   ctx->total = sizeof...(fs);
298   auto f_saved = ctx->p.getFuture();
299   detail::whenAllVariadicHelper(ctx,
300     std::forward<typename std::decay<Fs>::type>(fs)...);
301   return std::move(f_saved);
302 }
303
304 // when (iterator)
305
306 template <class InputIterator>
307 Future<
308   std::vector<
309   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
310 whenAll(InputIterator first, InputIterator last)
311 {
312   typedef
313     typename std::iterator_traits<InputIterator>::value_type::value_type T;
314
315   auto n = std::distance(first, last);
316   if (n == 0) {
317     return makeFuture(std::vector<Try<T>>());
318   }
319
320   auto ctx = new detail::WhenAllContext<T>();
321
322   ctx->total = n;
323   ctx->results.resize(ctx->total);
324
325   auto f_saved = ctx->p.getFuture();
326
327   for (size_t i = 0; first != last; ++first, ++i) {
328      auto& f = *first;
329      f.setCallback_([ctx, i](Try<T>&& t) {
330          ctx->results[i] = std::move(t);
331          if (++ctx->count == ctx->total) {
332            ctx->p.setValue(std::move(ctx->results));
333            delete ctx;
334          }
335        });
336   }
337
338   return std::move(f_saved);
339 }
340
341 template <class InputIterator>
342 Future<
343   std::pair<size_t,
344             Try<
345               typename
346               std::iterator_traits<InputIterator>::value_type::value_type> > >
347 whenAny(InputIterator first, InputIterator last) {
348   typedef
349     typename std::iterator_traits<InputIterator>::value_type::value_type T;
350
351   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
352   auto f_saved = ctx->p.getFuture();
353
354   for (size_t i = 0; first != last; first++, i++) {
355     auto& f = *first;
356     f.setCallback_([i, ctx](Try<T>&& t) {
357       if (!ctx->done.exchange(true)) {
358         ctx->p.setValue(std::make_pair(i, std::move(t)));
359       }
360       ctx->decref();
361     });
362   }
363
364   return std::move(f_saved);
365 }
366
367 template <class InputIterator>
368 Future<std::vector<std::pair<size_t, Try<typename
369   std::iterator_traits<InputIterator>::value_type::value_type>>>>
370 whenN(InputIterator first, InputIterator last, size_t n) {
371   typedef typename
372     std::iterator_traits<InputIterator>::value_type::value_type T;
373   typedef std::vector<std::pair<size_t, Try<T>>> V;
374
375   struct ctx_t {
376     V v;
377     size_t completed;
378     Promise<V> p;
379   };
380   auto ctx = std::make_shared<ctx_t>();
381   ctx->completed = 0;
382
383   // for each completed Future, increase count and add to vector, until we
384   // have n completed futures at which point we fulfil our Promise with the
385   // vector
386   auto it = first;
387   size_t i = 0;
388   while (it != last) {
389     it->then([ctx, n, i](Try<T>&& t) {
390       auto& v = ctx->v;
391       auto c = ++ctx->completed;
392       if (c <= n) {
393         assert(ctx->v.size() < n);
394         v.push_back(std::make_pair(i, std::move(t)));
395         if (c == n) {
396           ctx->p.fulfilTry(Try<V>(std::move(v)));
397         }
398       }
399     });
400
401     it++;
402     i++;
403   }
404
405   if (i < n) {
406     ctx->p.setException(std::runtime_error("Not enough futures"));
407   }
408
409   return ctx->p.getFuture();
410 }
411
412 template <typename T>
413 Future<T>
414 waitWithSemaphore(Future<T>&& f) {
415   Baton<> baton;
416   auto done = f.then([&](Try<T> &&t) {
417     baton.post();
418     return std::move(t.value());
419   });
420   baton.wait();
421   while (!done.isReady()) {
422     // There's a race here between the return here and the actual finishing of
423     // the future. f is completed, but the setup may not have finished on done
424     // after the baton has posted.
425     std::this_thread::yield();
426   }
427   return done;
428 }
429
430 template<>
431 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
432   Baton<> baton;
433   auto done = f.then([&](Try<void> &&t) {
434     baton.post();
435     t.value();
436   });
437   baton.wait();
438   while (!done.isReady()) {
439     // There's a race here between the return here and the actual finishing of
440     // the future. f is completed, but the setup may not have finished on done
441     // after the baton has posted.
442     std::this_thread::yield();
443   }
444   return done;
445 }
446
447 template <typename T, class Duration>
448 Future<T>
449 waitWithSemaphore(Future<T>&& f, Duration timeout) {
450   auto baton = std::make_shared<Baton<>>();
451   auto done = f.then([baton](Try<T> &&t) {
452     baton->post();
453     return std::move(t.value());
454   });
455   baton->timed_wait(std::chrono::system_clock::now() + timeout);
456   return done;
457 }
458
459 template <class Duration>
460 Future<void>
461 waitWithSemaphore(Future<void>&& f, Duration timeout) {
462   auto baton = std::make_shared<Baton<>>();
463   auto done = f.then([baton](Try<void> &&t) {
464     baton->post();
465     t.value();
466   });
467   baton->timed_wait(std::chrono::system_clock::now() + timeout);
468   return done;
469 }
470
471 }}
472
473 // I haven't included a Future<T&> specialization because I don't forsee us
474 // using it, however it is not difficult to add when needed. Refer to
475 // Future<void> for guidance. std::future and boost::future code would also be
476 // instructive.