1e17e7cb40d4c3c8769a7caa35fe0200df40b6c2
[folly.git] / folly / wangle / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include "detail/State.h"
20 #include <folly/LifoSem.h>
21
22 namespace folly { namespace wangle {
23
24 template <typename T>
25 struct isFuture {
26   static const bool value = false;
27 };
28
29 template <typename T>
30 struct isFuture<Future<T> > {
31   static const bool value = true;
32 };
33
34 template <class T>
35 Future<T>::Future(Future<T>&& other) noexcept : state_(nullptr) {
36   *this = std::move(other);
37 }
38
39 template <class T>
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41   std::swap(state_, other.state_);
42   return *this;
43 }
44
45 template <class T>
46 Future<T>::~Future() {
47   detach();
48 }
49
50 template <class T>
51 void Future<T>::detach() {
52   if (state_) {
53     state_->detachFuture();
54     state_ = nullptr;
55   }
56 }
57
58 template <class T>
59 void Future<T>::throwIfInvalid() const {
60   if (!state_)
61     throw NoState();
62 }
63
64 template <class T>
65 template <class F>
66 void Future<T>::setCallback_(F&& func) {
67   throwIfInvalid();
68   state_->setCallback(std::move(func));
69 }
70
71 template <class T>
72 template <class F>
73 typename std::enable_if<
74   !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
75   Future<typename std::result_of<F(Try<T>&&)>::type> >::type
76 Future<T>::then(F&& func) {
77   typedef typename std::result_of<F(Try<T>&&)>::type B;
78
79   throwIfInvalid();
80
81   // wrap these so we can move them into the lambda
82   folly::MoveWrapper<Promise<B>> p;
83   folly::MoveWrapper<F> funcm(std::forward<F>(func));
84
85   // grab the Future now before we lose our handle on the Promise
86   auto f = p->getFuture();
87
88   /* This is a bit tricky.
89
90      We can't just close over *this in case this Future gets moved. So we
91      make a new dummy Future. We could figure out something more
92      sophisticated that avoids making a new Future object when it can, as an
93      optimization. But this is correct.
94
95      state_ can't be moved, it is explicitly disallowed (as is copying). But
96      if there's ever a reason to allow it, this is one place that makes that
97      assumption and would need to be fixed. We use a standard shared pointer
98      for state_ (by copying it in), which means in essence obj holds a shared
99      pointer to itself.  But this shouldn't leak because Promise will not
100      outlive the continuation, because Promise will setException() with a
101      broken Promise if it is destructed before completed. We could use a
102      weak pointer but it would have to be converted to a shared pointer when
103      func is executed (because the Future returned by func may possibly
104      persist beyond the callback, if it gets moved), and so it is an
105      optimization to just make it shared from the get-go.
106
107      We have to move in the Promise and func using the MoveWrapper
108      hack. (func could be copied but it's a big drag on perf).
109
110      Two subtle but important points about this design. detail::State has no
111      back pointers to Future or Promise, so if Future or Promise get moved
112      (and they will be moved in performant code) we don't have to do
113      anything fancy. And because we store the continuation in the
114      detail::State, not in the Future, we can execute the continuation even
115      after the Future has gone out of scope. This is an intentional design
116      decision. It is likely we will want to be able to cancel a continuation
117      in some circumstances, but I think it should be explicit not implicit
118      in the destruction of the Future used to create it.
119      */
120   setCallback_(
121     [p, funcm](Try<T>&& t) mutable {
122       p->fulfil([&]() {
123           return (*funcm)(std::move(t));
124         });
125     });
126
127   return std::move(f);
128 }
129
130 template <class T>
131 template <class F>
132 typename std::enable_if<
133   isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
134   Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
135 Future<T>::then(F&& func) {
136   typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
137
138   throwIfInvalid();
139
140   // wrap these so we can move them into the lambda
141   folly::MoveWrapper<Promise<B>> p;
142   folly::MoveWrapper<F> funcm(std::forward<F>(func));
143
144   // grab the Future now before we lose our handle on the Promise
145   auto f = p->getFuture();
146
147   setCallback_(
148     [p, funcm](Try<T>&& t) mutable {
149       try {
150         auto f2 = (*funcm)(std::move(t));
151         // that didn't throw, now we can steal p
152         f2.setCallback_([p](Try<B>&& b) mutable {
153             p->fulfilTry(std::move(b));
154           });
155       } catch (...) {
156         p->setException(std::current_exception());
157       }
158     });
159
160   return std::move(f);
161 }
162
163 template <class T>
164 Future<void> Future<T>::then() {
165   return then([] (Try<T>&& t) {});
166 }
167
168 template <class T>
169 typename std::add_lvalue_reference<T>::type Future<T>::value() {
170   throwIfInvalid();
171
172   return state_->value();
173 }
174
175 template <class T>
176 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
177   throwIfInvalid();
178
179   return state_->value();
180 }
181
182 template <class T>
183 Try<T>& Future<T>::getTry() {
184   throwIfInvalid();
185
186   return state_->getTry();
187 }
188
189 template <class T>
190 template <typename Executor>
191 inline Later<T> Future<T>::via(Executor* executor) {
192   throwIfInvalid();
193   return Later<T>(std::move(*this)).via(executor);
194 }
195
196 template <class T>
197 bool Future<T>::isReady() const {
198   throwIfInvalid();
199   return state_->ready();
200 }
201
202 // makeFuture
203
204 template <class T>
205 Future<typename std::decay<T>::type> makeFuture(T&& t) {
206   Promise<typename std::decay<T>::type> p;
207   auto f = p.getFuture();
208   p.setValue(std::forward<T>(t));
209   return std::move(f);
210 }
211
212 inline // for multiple translation units
213 Future<void> makeFuture() {
214   Promise<void> p;
215   auto f = p.getFuture();
216   p.setValue();
217   return std::move(f);
218 }
219
220 template <class F>
221 auto makeFutureTry(
222     F&& func,
223     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
224     -> Future<decltype(func())> {
225   Promise<decltype(func())> p;
226   auto f = p.getFuture();
227   p.fulfil(
228     [&func]() {
229       return (func)();
230     });
231   return std::move(f);
232 }
233
234 template <class F>
235 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
236   F copy = func;
237   return makeFutureTry(std::move(copy));
238 }
239
240 template <class T>
241 Future<T> makeFuture(std::exception_ptr const& e) {
242   Promise<T> p;
243   auto f = p.getFuture();
244   p.setException(e);
245   return std::move(f);
246 }
247
248 template <class T, class E>
249 typename std::enable_if<std::is_base_of<std::exception, E>::value, Future<T>>::type
250 makeFuture(E const& e) {
251   Promise<T> p;
252   auto f = p.getFuture();
253   p.fulfil([&]() -> T { throw e; });
254   return std::move(f);
255 }
256
257 template <class T>
258 Future<T> makeFuture(Try<T>&& t) {
259   try {
260     return makeFuture<T>(std::move(t.value()));
261   } catch (...) {
262     return makeFuture<T>(std::current_exception());
263   }
264 }
265
266 template <>
267 inline Future<void> makeFuture(Try<void>&& t) {
268   try {
269     t.throwIfFailed();
270     return makeFuture();
271   } catch (...) {
272     return makeFuture<void>(std::current_exception());
273   }
274 }
275
276 // when (variadic)
277
278 template <typename... Fs>
279 typename detail::VariadicContext<
280   typename std::decay<Fs>::type::value_type...>::type
281 whenAll(Fs&&... fs)
282 {
283   auto ctx =
284     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
285   ctx->total = sizeof...(fs);
286   auto f_saved = ctx->p.getFuture();
287   detail::whenAllVariadicHelper(ctx,
288     std::forward<typename std::decay<Fs>::type>(fs)...);
289   return std::move(f_saved);
290 }
291
292 // when (iterator)
293
294 template <class InputIterator>
295 Future<
296   std::vector<
297   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
298 whenAll(InputIterator first, InputIterator last)
299 {
300   typedef
301     typename std::iterator_traits<InputIterator>::value_type::value_type T;
302
303   auto n = std::distance(first, last);
304   if (n == 0) {
305     return makeFuture(std::vector<Try<T>>());
306   }
307
308   auto ctx = new detail::WhenAllContext<T>();
309
310   ctx->total = n;
311   ctx->results.resize(ctx->total);
312
313   auto f_saved = ctx->p.getFuture();
314
315   for (size_t i = 0; first != last; ++first, ++i) {
316      auto& f = *first;
317      f.setCallback_([ctx, i](Try<T>&& t) {
318          ctx->results[i] = std::move(t);
319          if (++ctx->count == ctx->total) {
320            ctx->p.setValue(std::move(ctx->results));
321            delete ctx;
322          }
323        });
324   }
325
326   return std::move(f_saved);
327 }
328
329 template <class InputIterator>
330 Future<
331   std::pair<size_t,
332             Try<
333               typename
334               std::iterator_traits<InputIterator>::value_type::value_type> > >
335 whenAny(InputIterator first, InputIterator last) {
336   typedef
337     typename std::iterator_traits<InputIterator>::value_type::value_type T;
338
339   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
340   auto f_saved = ctx->p.getFuture();
341
342   for (size_t i = 0; first != last; first++, i++) {
343     auto& f = *first;
344     f.setCallback_([i, ctx](Try<T>&& t) {
345       if (!ctx->done.exchange(true)) {
346         ctx->p.setValue(std::make_pair(i, std::move(t)));
347       }
348       ctx->decref();
349     });
350   }
351
352   return std::move(f_saved);
353 }
354
355 template <class InputIterator>
356 Future<std::vector<std::pair<size_t, Try<typename
357   std::iterator_traits<InputIterator>::value_type::value_type>>>>
358 whenN(InputIterator first, InputIterator last, size_t n) {
359   typedef typename
360     std::iterator_traits<InputIterator>::value_type::value_type T;
361   typedef std::vector<std::pair<size_t, Try<T>>> V;
362
363   struct ctx_t {
364     V v;
365     size_t completed;
366     Promise<V> p;
367   };
368   auto ctx = std::make_shared<ctx_t>();
369   ctx->completed = 0;
370
371   // for each completed Future, increase count and add to vector, until we
372   // have n completed futures at which point we fulfil our Promise with the
373   // vector
374   auto it = first;
375   size_t i = 0;
376   while (it != last) {
377     it->then([ctx, n, i](Try<T>&& t) {
378       auto& v = ctx->v;
379       auto c = ++ctx->completed;
380       if (c <= n) {
381         assert(ctx->v.size() < n);
382         v.push_back(std::make_pair(i, std::move(t)));
383         if (c == n) {
384           ctx->p.fulfilTry(Try<V>(std::move(v)));
385         }
386       }
387     });
388
389     it++;
390     i++;
391   }
392
393   if (i < n) {
394     ctx->p.setException(std::runtime_error("Not enough futures"));
395   }
396
397   return ctx->p.getFuture();
398 }
399
400 template <typename T>
401 Future<T>
402 waitWithSemaphore(Future<T>&& f) {
403   LifoSem sem;
404   auto done = f.then([&](Try<T> &&t) {
405     sem.post();
406     return std::move(t.value());
407   });
408   sem.wait();
409   return done;
410 }
411
412 template<>
413 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
414   LifoSem sem;
415   auto done = f.then([&](Try<void> &&t) {
416     sem.post();
417     t.value();
418   });
419   sem.wait();
420   return done;
421 }
422
423 template <typename T, class Duration>
424 Future<T>
425 waitWithSemaphore(Future<T>&& f, Duration timeout) {
426   auto sem = std::make_shared<LifoSem>();
427   auto done = f.then([sem](Try<T> &&t) {
428     sem->post();
429     return std::move(t.value());
430   });
431   std::thread t([sem, timeout](){
432     std::this_thread::sleep_for(timeout);
433     sem->shutdown();
434     });
435   t.detach();
436   try {
437     sem->wait();
438   } catch (ShutdownSemError & ign) { }
439   return done;
440 }
441
442 template <class Duration>
443 Future<void>
444 waitWithSemaphore(Future<void>&& f, Duration timeout) {
445   auto sem = std::make_shared<LifoSem>();
446   auto done = f.then([sem](Try<void> &&t) {
447     sem->post();
448     t.value();
449   });
450   std::thread t([sem, timeout](){
451     std::this_thread::sleep_for(timeout);
452     sem->shutdown();
453     });
454   t.detach();
455   try {
456     sem->wait();
457   } catch (ShutdownSemError & ign) { }
458   return done;
459 }
460
461 }}
462
463 // I haven't included a Future<T&> specialization because I don't forsee us
464 // using it, however it is not difficult to add when needed. Refer to
465 // Future<void> for guidance. std::future and boost::future code would also be
466 // instructive.