3783126e8b72d624776556363def5863d4739f1c
[folly.git] / folly / wangle / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include "detail/State.h"
20 #include <folly/LifoSem.h>
21
22 namespace folly { namespace wangle {
23
24 template <typename T>
25 struct isFuture {
26   static const bool value = false;
27 };
28
29 template <typename T>
30 struct isFuture<Future<T> > {
31   static const bool value = true;
32 };
33
34 template <class T>
35 Future<T>::Future(Future<T>&& other) noexcept : state_(nullptr) {
36   *this = std::move(other);
37 }
38
39 template <class T>
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41   std::swap(state_, other.state_);
42   return *this;
43 }
44
45 template <class T>
46 Future<T>::~Future() {
47   detach();
48 }
49
50 template <class T>
51 void Future<T>::detach() {
52   if (state_) {
53     state_->detachFuture();
54     state_ = nullptr;
55   }
56 }
57
58 template <class T>
59 void Future<T>::throwIfInvalid() const {
60   if (!state_)
61     throw NoState();
62 }
63
64 template <class T>
65 template <class F>
66 void Future<T>::setCallback_(F&& func) {
67   throwIfInvalid();
68   state_->setCallback(std::move(func));
69 }
70
71 template <class T>
72 template <class F>
73 typename std::enable_if<
74   !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
75   Future<typename std::result_of<F(Try<T>&&)>::type> >::type
76 Future<T>::then(F&& func) {
77   typedef typename std::result_of<F(Try<T>&&)>::type B;
78
79   throwIfInvalid();
80
81   // wrap these so we can move them into the lambda
82   folly::MoveWrapper<Promise<B>> p;
83   folly::MoveWrapper<F> funcm(std::forward<F>(func));
84
85   // grab the Future now before we lose our handle on the Promise
86   auto f = p->getFuture();
87
88   /* This is a bit tricky.
89
90      We can't just close over *this in case this Future gets moved. So we
91      make a new dummy Future. We could figure out something more
92      sophisticated that avoids making a new Future object when it can, as an
93      optimization. But this is correct.
94
95      state_ can't be moved, it is explicitly disallowed (as is copying). But
96      if there's ever a reason to allow it, this is one place that makes that
97      assumption and would need to be fixed. We use a standard shared pointer
98      for state_ (by copying it in), which means in essence obj holds a shared
99      pointer to itself.  But this shouldn't leak because Promise will not
100      outlive the continuation, because Promise will setException() with a
101      broken Promise if it is destructed before completed. We could use a
102      weak pointer but it would have to be converted to a shared pointer when
103      func is executed (because the Future returned by func may possibly
104      persist beyond the callback, if it gets moved), and so it is an
105      optimization to just make it shared from the get-go.
106
107      We have to move in the Promise and func using the MoveWrapper
108      hack. (func could be copied but it's a big drag on perf).
109
110      Two subtle but important points about this design. detail::State has no
111      back pointers to Future or Promise, so if Future or Promise get moved
112      (and they will be moved in performant code) we don't have to do
113      anything fancy. And because we store the continuation in the
114      detail::State, not in the Future, we can execute the continuation even
115      after the Future has gone out of scope. This is an intentional design
116      decision. It is likely we will want to be able to cancel a continuation
117      in some circumstances, but I think it should be explicit not implicit
118      in the destruction of the Future used to create it.
119      */
120   setCallback_(
121     [p, funcm](Try<T>&& t) mutable {
122       p->fulfil([&]() {
123           return (*funcm)(std::move(t));
124         });
125     });
126
127   return std::move(f);
128 }
129
130 template <class T>
131 template <class F>
132 typename std::enable_if<
133   isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
134   Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
135 Future<T>::then(F&& func) {
136   typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
137
138   throwIfInvalid();
139
140   // wrap these so we can move them into the lambda
141   folly::MoveWrapper<Promise<B>> p;
142   folly::MoveWrapper<F> funcm(std::forward<F>(func));
143
144   // grab the Future now before we lose our handle on the Promise
145   auto f = p->getFuture();
146
147   setCallback_(
148     [p, funcm](Try<T>&& t) mutable {
149       try {
150         auto f2 = (*funcm)(std::move(t));
151         // that didn't throw, now we can steal p
152         f2.setCallback_([p](Try<B>&& b) mutable {
153             p->fulfilTry(std::move(b));
154           });
155       } catch (...) {
156         p->setException(std::current_exception());
157       }
158     });
159
160   return std::move(f);
161 }
162
163 template <class T>
164 Future<void> Future<T>::then() {
165   return then([] (Try<T>&& t) {});
166 }
167
168 template <class T>
169 typename std::add_lvalue_reference<T>::type Future<T>::value() {
170   throwIfInvalid();
171
172   return state_->value();
173 }
174
175 template <class T>
176 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
177   throwIfInvalid();
178
179   return state_->value();
180 }
181
182 template <class T>
183 Try<T>& Future<T>::getTry() {
184   throwIfInvalid();
185
186   return state_->getTry();
187 }
188
189 template <class T>
190 template <typename Executor>
191 inline Future<T> Future<T>::via(Executor* executor) {
192   throwIfInvalid();
193   auto f = then([=](Try<T>&& t) {
194     MoveWrapper<Promise<T>> promise;
195     MoveWrapper<Try<T>> tw(std::move(t));
196     auto f2 = promise->getFuture();
197     executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); });
198     return f2;
199   });
200   f.deactivate();
201   return f;
202 }
203
204 template <class T>
205 bool Future<T>::isReady() const {
206   throwIfInvalid();
207   return state_->ready();
208 }
209
210 // makeFuture
211
212 template <class T>
213 Future<typename std::decay<T>::type> makeFuture(T&& t) {
214   Promise<typename std::decay<T>::type> p;
215   auto f = p.getFuture();
216   p.setValue(std::forward<T>(t));
217   return std::move(f);
218 }
219
220 inline // for multiple translation units
221 Future<void> makeFuture() {
222   Promise<void> p;
223   auto f = p.getFuture();
224   p.setValue();
225   return std::move(f);
226 }
227
228 template <class F>
229 auto makeFutureTry(
230     F&& func,
231     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
232     -> Future<decltype(func())> {
233   Promise<decltype(func())> p;
234   auto f = p.getFuture();
235   p.fulfil(
236     [&func]() {
237       return (func)();
238     });
239   return std::move(f);
240 }
241
242 template <class F>
243 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
244   F copy = func;
245   return makeFutureTry(std::move(copy));
246 }
247
248 template <class T>
249 Future<T> makeFuture(std::exception_ptr const& e) {
250   Promise<T> p;
251   auto f = p.getFuture();
252   p.setException(e);
253   return std::move(f);
254 }
255
256 template <class T, class E>
257 typename std::enable_if<std::is_base_of<std::exception, E>::value, Future<T>>::type
258 makeFuture(E const& e) {
259   Promise<T> p;
260   auto f = p.getFuture();
261   p.fulfil([&]() -> T { throw e; });
262   return std::move(f);
263 }
264
265 template <class T>
266 Future<T> makeFuture(Try<T>&& t) {
267   try {
268     return makeFuture<T>(std::move(t.value()));
269   } catch (...) {
270     return makeFuture<T>(std::current_exception());
271   }
272 }
273
274 template <>
275 inline Future<void> makeFuture(Try<void>&& t) {
276   try {
277     t.throwIfFailed();
278     return makeFuture();
279   } catch (...) {
280     return makeFuture<void>(std::current_exception());
281   }
282 }
283
284 // when (variadic)
285
286 template <typename... Fs>
287 typename detail::VariadicContext<
288   typename std::decay<Fs>::type::value_type...>::type
289 whenAll(Fs&&... fs)
290 {
291   auto ctx =
292     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
293   ctx->total = sizeof...(fs);
294   auto f_saved = ctx->p.getFuture();
295   detail::whenAllVariadicHelper(ctx,
296     std::forward<typename std::decay<Fs>::type>(fs)...);
297   return std::move(f_saved);
298 }
299
300 // when (iterator)
301
302 template <class InputIterator>
303 Future<
304   std::vector<
305   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
306 whenAll(InputIterator first, InputIterator last)
307 {
308   typedef
309     typename std::iterator_traits<InputIterator>::value_type::value_type T;
310
311   auto n = std::distance(first, last);
312   if (n == 0) {
313     return makeFuture(std::vector<Try<T>>());
314   }
315
316   auto ctx = new detail::WhenAllContext<T>();
317
318   ctx->total = n;
319   ctx->results.resize(ctx->total);
320
321   auto f_saved = ctx->p.getFuture();
322
323   for (size_t i = 0; first != last; ++first, ++i) {
324      auto& f = *first;
325      f.setCallback_([ctx, i](Try<T>&& t) {
326          ctx->results[i] = std::move(t);
327          if (++ctx->count == ctx->total) {
328            ctx->p.setValue(std::move(ctx->results));
329            delete ctx;
330          }
331        });
332   }
333
334   return std::move(f_saved);
335 }
336
337 template <class InputIterator>
338 Future<
339   std::pair<size_t,
340             Try<
341               typename
342               std::iterator_traits<InputIterator>::value_type::value_type> > >
343 whenAny(InputIterator first, InputIterator last) {
344   typedef
345     typename std::iterator_traits<InputIterator>::value_type::value_type T;
346
347   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
348   auto f_saved = ctx->p.getFuture();
349
350   for (size_t i = 0; first != last; first++, i++) {
351     auto& f = *first;
352     f.setCallback_([i, ctx](Try<T>&& t) {
353       if (!ctx->done.exchange(true)) {
354         ctx->p.setValue(std::make_pair(i, std::move(t)));
355       }
356       ctx->decref();
357     });
358   }
359
360   return std::move(f_saved);
361 }
362
363 template <class InputIterator>
364 Future<std::vector<std::pair<size_t, Try<typename
365   std::iterator_traits<InputIterator>::value_type::value_type>>>>
366 whenN(InputIterator first, InputIterator last, size_t n) {
367   typedef typename
368     std::iterator_traits<InputIterator>::value_type::value_type T;
369   typedef std::vector<std::pair<size_t, Try<T>>> V;
370
371   struct ctx_t {
372     V v;
373     size_t completed;
374     Promise<V> p;
375   };
376   auto ctx = std::make_shared<ctx_t>();
377   ctx->completed = 0;
378
379   // for each completed Future, increase count and add to vector, until we
380   // have n completed futures at which point we fulfil our Promise with the
381   // vector
382   auto it = first;
383   size_t i = 0;
384   while (it != last) {
385     it->then([ctx, n, i](Try<T>&& t) {
386       auto& v = ctx->v;
387       auto c = ++ctx->completed;
388       if (c <= n) {
389         assert(ctx->v.size() < n);
390         v.push_back(std::make_pair(i, std::move(t)));
391         if (c == n) {
392           ctx->p.fulfilTry(Try<V>(std::move(v)));
393         }
394       }
395     });
396
397     it++;
398     i++;
399   }
400
401   if (i < n) {
402     ctx->p.setException(std::runtime_error("Not enough futures"));
403   }
404
405   return ctx->p.getFuture();
406 }
407
408 template <typename T>
409 Future<T>
410 waitWithSemaphore(Future<T>&& f) {
411   LifoSem sem;
412   auto done = f.then([&](Try<T> &&t) {
413     sem.post();
414     return std::move(t.value());
415   });
416   sem.wait();
417   return done;
418 }
419
420 template<>
421 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
422   LifoSem sem;
423   auto done = f.then([&](Try<void> &&t) {
424     sem.post();
425     t.value();
426   });
427   sem.wait();
428   return done;
429 }
430
431 template <typename T, class Duration>
432 Future<T>
433 waitWithSemaphore(Future<T>&& f, Duration timeout) {
434   auto sem = std::make_shared<LifoSem>();
435   auto done = f.then([sem](Try<T> &&t) {
436     sem->post();
437     return std::move(t.value());
438   });
439   std::thread t([sem, timeout](){
440     std::this_thread::sleep_for(timeout);
441     sem->shutdown();
442     });
443   t.detach();
444   try {
445     sem->wait();
446   } catch (ShutdownSemError & ign) { }
447   return done;
448 }
449
450 template <class Duration>
451 Future<void>
452 waitWithSemaphore(Future<void>&& f, Duration timeout) {
453   auto sem = std::make_shared<LifoSem>();
454   auto done = f.then([sem](Try<void> &&t) {
455     sem->post();
456     t.value();
457   });
458   std::thread t([sem, timeout](){
459     std::this_thread::sleep_for(timeout);
460     sem->shutdown();
461     });
462   t.detach();
463   try {
464     sem->wait();
465   } catch (ShutdownSemError & ign) { }
466   return done;
467 }
468
469 }}
470
471 // I haven't included a Future<T&> specialization because I don't forsee us
472 // using it, however it is not difficult to add when needed. Refer to
473 // Future<void> for guidance. std::future and boost::future code would also be
474 // instructive.