f404067b8afab3a39176f4addc2885933c303a63
[folly.git] / folly / wangle / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/wangle/detail/State.h>
23 #include <folly/Baton.h>
24
25 namespace folly { namespace wangle {
26
27 template <typename T>
28 struct isFuture {
29   static const bool value = false;
30 };
31
32 template <typename T>
33 struct isFuture<Future<T> > {
34   static const bool value = true;
35 };
36
37 template <class T>
38 Future<T>::Future(Future<T>&& other) noexcept : state_(nullptr) {
39   *this = std::move(other);
40 }
41
42 template <class T>
43 Future<T>& Future<T>::operator=(Future<T>&& other) {
44   std::swap(state_, other.state_);
45   return *this;
46 }
47
48 template <class T>
49 Future<T>::~Future() {
50   detach();
51 }
52
53 template <class T>
54 void Future<T>::detach() {
55   if (state_) {
56     state_->detachFuture();
57     state_ = nullptr;
58   }
59 }
60
61 template <class T>
62 void Future<T>::throwIfInvalid() const {
63   if (!state_)
64     throw NoState();
65 }
66
67 template <class T>
68 template <class F>
69 void Future<T>::setCallback_(F&& func) {
70   throwIfInvalid();
71   state_->setCallback(std::move(func));
72 }
73
74 template <class T>
75 template <class F>
76 typename std::enable_if<
77   !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
78   Future<typename std::result_of<F(Try<T>&&)>::type> >::type
79 Future<T>::then(F&& func) {
80   typedef typename std::result_of<F(Try<T>&&)>::type B;
81
82   throwIfInvalid();
83
84   // wrap these so we can move them into the lambda
85   folly::MoveWrapper<Promise<B>> p;
86   folly::MoveWrapper<F> funcm(std::forward<F>(func));
87
88   // grab the Future now before we lose our handle on the Promise
89   auto f = p->getFuture();
90
91   /* This is a bit tricky.
92
93      We can't just close over *this in case this Future gets moved. So we
94      make a new dummy Future. We could figure out something more
95      sophisticated that avoids making a new Future object when it can, as an
96      optimization. But this is correct.
97
98      state_ can't be moved, it is explicitly disallowed (as is copying). But
99      if there's ever a reason to allow it, this is one place that makes that
100      assumption and would need to be fixed. We use a standard shared pointer
101      for state_ (by copying it in), which means in essence obj holds a shared
102      pointer to itself.  But this shouldn't leak because Promise will not
103      outlive the continuation, because Promise will setException() with a
104      broken Promise if it is destructed before completed. We could use a
105      weak pointer but it would have to be converted to a shared pointer when
106      func is executed (because the Future returned by func may possibly
107      persist beyond the callback, if it gets moved), and so it is an
108      optimization to just make it shared from the get-go.
109
110      We have to move in the Promise and func using the MoveWrapper
111      hack. (func could be copied but it's a big drag on perf).
112
113      Two subtle but important points about this design. detail::State has no
114      back pointers to Future or Promise, so if Future or Promise get moved
115      (and they will be moved in performant code) we don't have to do
116      anything fancy. And because we store the continuation in the
117      detail::State, not in the Future, we can execute the continuation even
118      after the Future has gone out of scope. This is an intentional design
119      decision. It is likely we will want to be able to cancel a continuation
120      in some circumstances, but I think it should be explicit not implicit
121      in the destruction of the Future used to create it.
122      */
123   setCallback_(
124     [p, funcm](Try<T>&& t) mutable {
125       p->fulfil([&]() {
126           return (*funcm)(std::move(t));
127         });
128     });
129
130   return std::move(f);
131 }
132
133 template <class T>
134 template <class F>
135 typename std::enable_if<
136   isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
137   Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
138 Future<T>::then(F&& func) {
139   typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
140
141   throwIfInvalid();
142
143   // wrap these so we can move them into the lambda
144   folly::MoveWrapper<Promise<B>> p;
145   folly::MoveWrapper<F> funcm(std::forward<F>(func));
146
147   // grab the Future now before we lose our handle on the Promise
148   auto f = p->getFuture();
149
150   setCallback_(
151     [p, funcm](Try<T>&& t) mutable {
152       try {
153         auto f2 = (*funcm)(std::move(t));
154         // that didn't throw, now we can steal p
155         f2.setCallback_([p](Try<B>&& b) mutable {
156             p->fulfilTry(std::move(b));
157           });
158       } catch (...) {
159         p->setException(std::current_exception());
160       }
161     });
162
163   return std::move(f);
164 }
165
166 template <class T>
167 Future<void> Future<T>::then() {
168   return then([] (Try<T>&& t) {});
169 }
170
171 template <class T>
172 typename std::add_lvalue_reference<T>::type Future<T>::value() {
173   throwIfInvalid();
174
175   return state_->value();
176 }
177
178 template <class T>
179 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
180   throwIfInvalid();
181
182   return state_->value();
183 }
184
185 template <class T>
186 Try<T>& Future<T>::getTry() {
187   throwIfInvalid();
188
189   return state_->getTry();
190 }
191
192 template <class T>
193 template <typename Executor>
194 inline Future<T> Future<T>::via(Executor* executor) {
195   throwIfInvalid();
196   MoveWrapper<Promise<T>> promise;
197
198   auto f = promise->getFuture();
199   // We are obligated to return a cold future.
200   f.deactivate();
201   // But we also need to make this one cold for via to at least work some of
202   // the time. (see below)
203   deactivate();
204
205   then([=](Try<T>&& t) mutable {
206     MoveWrapper<Try<T>> tw(std::move(t));
207     // There is a race here.
208     // When the promise is fulfilled, and the future is still inactive, when
209     // the future is activated (e.g. by destruction) the callback will happen
210     // in that context, not in the intended context (which has already left
211     // the building).
212     //
213     // Currently, this will work fine because all the temporaries are
214     // destructed in an order that is compatible with this implementation:
215     //
216     //   makeFuture().via(x).then(a).then(b);
217     //
218     // However, this will not work reliably:
219     //
220     //   auto f2 = makeFuture().via(x);
221     //   f2.then(a).then(b);
222     //
223     // Because the first temporary is destructed on the first line, and the
224     // executor is fed. But by the time f2 is destructed, the executor
225     // may have already fulfilled the promise on the other thread.
226     //
227     // TODO(#4920689) fix it
228     executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); });
229   });
230
231   return f;
232 }
233
234 template <class T>
235 bool Future<T>::isReady() const {
236   throwIfInvalid();
237   return state_->ready();
238 }
239
240 // makeFuture
241
242 template <class T>
243 Future<typename std::decay<T>::type> makeFuture(T&& t) {
244   Promise<typename std::decay<T>::type> p;
245   auto f = p.getFuture();
246   p.setValue(std::forward<T>(t));
247   return std::move(f);
248 }
249
250 inline // for multiple translation units
251 Future<void> makeFuture() {
252   Promise<void> p;
253   auto f = p.getFuture();
254   p.setValue();
255   return std::move(f);
256 }
257
258 template <class F>
259 auto makeFutureTry(
260     F&& func,
261     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
262     -> Future<decltype(func())> {
263   Promise<decltype(func())> p;
264   auto f = p.getFuture();
265   p.fulfil(
266     [&func]() {
267       return (func)();
268     });
269   return std::move(f);
270 }
271
272 template <class F>
273 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
274   F copy = func;
275   return makeFutureTry(std::move(copy));
276 }
277
278 template <class T>
279 Future<T> makeFuture(std::exception_ptr const& e) {
280   Promise<T> p;
281   auto f = p.getFuture();
282   p.setException(e);
283   return std::move(f);
284 }
285
286 template <class T, class E>
287 typename std::enable_if<std::is_base_of<std::exception, E>::value,
288                         Future<T>>::type
289 makeFuture(E const& e) {
290   Promise<T> p;
291   auto f = p.getFuture();
292   p.fulfil([&]() -> T { throw e; });
293   return std::move(f);
294 }
295
296 template <class T>
297 Future<T> makeFuture(Try<T>&& t) {
298   try {
299     return makeFuture<T>(std::move(t.value()));
300   } catch (...) {
301     return makeFuture<T>(std::current_exception());
302   }
303 }
304
305 template <>
306 inline Future<void> makeFuture(Try<void>&& t) {
307   try {
308     t.throwIfFailed();
309     return makeFuture();
310   } catch (...) {
311     return makeFuture<void>(std::current_exception());
312   }
313 }
314
315 // when (variadic)
316
317 template <typename... Fs>
318 typename detail::VariadicContext<
319   typename std::decay<Fs>::type::value_type...>::type
320 whenAll(Fs&&... fs)
321 {
322   auto ctx =
323     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
324   ctx->total = sizeof...(fs);
325   auto f_saved = ctx->p.getFuture();
326   detail::whenAllVariadicHelper(ctx,
327     std::forward<typename std::decay<Fs>::type>(fs)...);
328   return std::move(f_saved);
329 }
330
331 // when (iterator)
332
333 template <class InputIterator>
334 Future<
335   std::vector<
336   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
337 whenAll(InputIterator first, InputIterator last)
338 {
339   typedef
340     typename std::iterator_traits<InputIterator>::value_type::value_type T;
341
342   auto n = std::distance(first, last);
343   if (n == 0) {
344     return makeFuture(std::vector<Try<T>>());
345   }
346
347   auto ctx = new detail::WhenAllContext<T>();
348
349   ctx->total = n;
350   ctx->results.resize(ctx->total);
351
352   auto f_saved = ctx->p.getFuture();
353
354   for (size_t i = 0; first != last; ++first, ++i) {
355      auto& f = *first;
356      f.setCallback_([ctx, i](Try<T>&& t) {
357          ctx->results[i] = std::move(t);
358          if (++ctx->count == ctx->total) {
359            ctx->p.setValue(std::move(ctx->results));
360            delete ctx;
361          }
362        });
363   }
364
365   return std::move(f_saved);
366 }
367
368 template <class InputIterator>
369 Future<
370   std::pair<size_t,
371             Try<
372               typename
373               std::iterator_traits<InputIterator>::value_type::value_type> > >
374 whenAny(InputIterator first, InputIterator last) {
375   typedef
376     typename std::iterator_traits<InputIterator>::value_type::value_type T;
377
378   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
379   auto f_saved = ctx->p.getFuture();
380
381   for (size_t i = 0; first != last; first++, i++) {
382     auto& f = *first;
383     f.setCallback_([i, ctx](Try<T>&& t) {
384       if (!ctx->done.exchange(true)) {
385         ctx->p.setValue(std::make_pair(i, std::move(t)));
386       }
387       ctx->decref();
388     });
389   }
390
391   return std::move(f_saved);
392 }
393
394 template <class InputIterator>
395 Future<std::vector<std::pair<size_t, Try<typename
396   std::iterator_traits<InputIterator>::value_type::value_type>>>>
397 whenN(InputIterator first, InputIterator last, size_t n) {
398   typedef typename
399     std::iterator_traits<InputIterator>::value_type::value_type T;
400   typedef std::vector<std::pair<size_t, Try<T>>> V;
401
402   struct ctx_t {
403     V v;
404     size_t completed;
405     Promise<V> p;
406   };
407   auto ctx = std::make_shared<ctx_t>();
408   ctx->completed = 0;
409
410   // for each completed Future, increase count and add to vector, until we
411   // have n completed futures at which point we fulfil our Promise with the
412   // vector
413   auto it = first;
414   size_t i = 0;
415   while (it != last) {
416     it->then([ctx, n, i](Try<T>&& t) {
417       auto& v = ctx->v;
418       auto c = ++ctx->completed;
419       if (c <= n) {
420         assert(ctx->v.size() < n);
421         v.push_back(std::make_pair(i, std::move(t)));
422         if (c == n) {
423           ctx->p.fulfilTry(Try<V>(std::move(v)));
424         }
425       }
426     });
427
428     it++;
429     i++;
430   }
431
432   if (i < n) {
433     ctx->p.setException(std::runtime_error("Not enough futures"));
434   }
435
436   return ctx->p.getFuture();
437 }
438
439 template <typename T>
440 Future<T>
441 waitWithSemaphore(Future<T>&& f) {
442   Baton<> baton;
443   auto done = f.then([&](Try<T> &&t) {
444     baton.post();
445     return std::move(t.value());
446   });
447   baton.wait();
448   while (!done.isReady()) {
449     // There's a race here between the return here and the actual finishing of
450     // the future. f is completed, but the setup may not have finished on done
451     // after the baton has posted.
452     std::this_thread::yield();
453   }
454   return done;
455 }
456
457 template<>
458 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
459   Baton<> baton;
460   auto done = f.then([&](Try<void> &&t) {
461     baton.post();
462     t.value();
463   });
464   baton.wait();
465   while (!done.isReady()) {
466     // There's a race here between the return here and the actual finishing of
467     // the future. f is completed, but the setup may not have finished on done
468     // after the baton has posted.
469     std::this_thread::yield();
470   }
471   return done;
472 }
473
474 template <typename T, class Duration>
475 Future<T>
476 waitWithSemaphore(Future<T>&& f, Duration timeout) {
477   auto baton = std::make_shared<Baton<>>();
478   auto done = f.then([baton](Try<T> &&t) {
479     baton->post();
480     return std::move(t.value());
481   });
482   baton->timed_wait(std::chrono::system_clock::now() + timeout);
483   return done;
484 }
485
486 template <class Duration>
487 Future<void>
488 waitWithSemaphore(Future<void>&& f, Duration timeout) {
489   auto baton = std::make_shared<Baton<>>();
490   auto done = f.then([baton](Try<void> &&t) {
491     baton->post();
492     t.value();
493   });
494   baton->timed_wait(std::chrono::system_clock::now() + timeout);
495   return done;
496 }
497
498 }}
499
500 // I haven't included a Future<T&> specialization because I don't forsee us
501 // using it, however it is not difficult to add when needed. Refer to
502 // Future<void> for guidance. std::future and boost::future code would also be
503 // instructive.