via and activate/deactivate chaining
[folly.git] / folly / wangle / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/wangle/detail/Core.h>
23 #include <folly/Baton.h>
24
25 namespace folly { namespace wangle {
26
27 template <typename T>
28 struct isFuture {
29   static const bool value = false;
30 };
31
32 template <typename T>
33 struct isFuture<Future<T> > {
34   static const bool value = true;
35 };
36
37 template <class T>
38 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
39   *this = std::move(other);
40 }
41
42 template <class T>
43 Future<T>& Future<T>::operator=(Future<T>&& other) {
44   std::swap(core_, other.core_);
45   return *this;
46 }
47
48 template <class T>
49 Future<T>::~Future() {
50   detach();
51 }
52
53 template <class T>
54 void Future<T>::detach() {
55   if (core_) {
56     core_->detachFuture();
57     core_ = nullptr;
58   }
59 }
60
61 template <class T>
62 void Future<T>::throwIfInvalid() const {
63   if (!core_)
64     throw NoState();
65 }
66
67 template <class T>
68 template <class F>
69 void Future<T>::setCallback_(F&& func) {
70   throwIfInvalid();
71   core_->setCallback(std::move(func));
72 }
73
74 // Variant: f.then([](Try<T>&& t){ return t.value(); });
75 template <class T>
76 template <class F>
77 typename std::enable_if<
78   !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
79   Future<typename std::result_of<F(Try<T>&&)>::type> >::type
80 Future<T>::then(F&& func) {
81   typedef typename std::result_of<F(Try<T>&&)>::type B;
82
83   throwIfInvalid();
84
85   // wrap these so we can move them into the lambda
86   folly::MoveWrapper<Promise<B>> p;
87   folly::MoveWrapper<F> funcm(std::forward<F>(func));
88
89   // grab the Future now before we lose our handle on the Promise
90   auto f = p->getFuture();
91
92   /* This is a bit tricky.
93
94      We can't just close over *this in case this Future gets moved. So we
95      make a new dummy Future. We could figure out something more
96      sophisticated that avoids making a new Future object when it can, as an
97      optimization. But this is correct.
98
99      core_ can't be moved, it is explicitly disallowed (as is copying). But
100      if there's ever a reason to allow it, this is one place that makes that
101      assumption and would need to be fixed. We use a standard shared pointer
102      for core_ (by copying it in), which means in essence obj holds a shared
103      pointer to itself.  But this shouldn't leak because Promise will not
104      outlive the continuation, because Promise will setException() with a
105      broken Promise if it is destructed before completed. We could use a
106      weak pointer but it would have to be converted to a shared pointer when
107      func is executed (because the Future returned by func may possibly
108      persist beyond the callback, if it gets moved), and so it is an
109      optimization to just make it shared from the get-go.
110
111      We have to move in the Promise and func using the MoveWrapper
112      hack. (func could be copied but it's a big drag on perf).
113
114      Two subtle but important points about this design. detail::Core has no
115      back pointers to Future or Promise, so if Future or Promise get moved
116      (and they will be moved in performant code) we don't have to do
117      anything fancy. And because we store the continuation in the
118      detail::Core, not in the Future, we can execute the continuation even
119      after the Future has gone out of scope. This is an intentional design
120      decision. It is likely we will want to be able to cancel a continuation
121      in some circumstances, but I think it should be explicit not implicit
122      in the destruction of the Future used to create it.
123      */
124   setCallback_(
125     [p, funcm](Try<T>&& t) mutable {
126       p->fulfil([&]() {
127           return (*funcm)(std::move(t));
128         });
129     });
130
131   return std::move(f);
132 }
133
134 // Variant: f.then([](T&& t){ return t; });
135 template <class T>
136 template <class F>
137 typename std::enable_if<
138   !std::is_same<T, void>::value &&
139   !isFuture<typename std::result_of<
140     F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
141   Future<typename std::result_of<
142     F(typename detail::AliasIfVoid<T>::type&&)>::type> >::type
143 Future<T>::then(F&& func) {
144   typedef typename std::result_of<F(T&&)>::type B;
145
146   throwIfInvalid();
147
148   folly::MoveWrapper<Promise<B>> p;
149   folly::MoveWrapper<F> funcm(std::forward<F>(func));
150   auto f = p->getFuture();
151
152   setCallback_(
153     [p, funcm](Try<T>&& t) mutable {
154       if (t.hasException()) {
155         p->setException(t.getException());
156       } else {
157         p->fulfil([&]() {
158           return (*funcm)(std::move(t.value()));
159         });
160       }
161     });
162
163   return std::move(f);
164 }
165
166 // Variant: f.then([](){ return; });
167 template <class T>
168 template <class F>
169 typename std::enable_if<
170   std::is_same<T, void>::value &&
171   !isFuture<typename std::result_of<F()>::type>::value,
172   Future<typename std::result_of<F()>::type> >::type
173 Future<T>::then(F&& func) {
174   typedef typename std::result_of<F()>::type B;
175
176   throwIfInvalid();
177
178   folly::MoveWrapper<Promise<B>> p;
179   folly::MoveWrapper<F> funcm(std::forward<F>(func));
180   auto f = p->getFuture();
181
182   setCallback_(
183     [p, funcm](Try<T>&& t) mutable {
184       if (t.hasException()) {
185         p->setException(t.getException());
186       } else {
187         p->fulfil([&]() {
188           return (*funcm)();
189         });
190       }
191     });
192
193   return std::move(f);
194 }
195
196 // Variant: f.then([](Try<T>&& t){ return makeFuture<T>(t.value()); });
197 template <class T>
198 template <class F>
199 typename std::enable_if<
200   isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
201   Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
202 Future<T>::then(F&& func) {
203   typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
204
205   throwIfInvalid();
206
207   // wrap these so we can move them into the lambda
208   folly::MoveWrapper<Promise<B>> p;
209   folly::MoveWrapper<F> funcm(std::forward<F>(func));
210
211   // grab the Future now before we lose our handle on the Promise
212   auto f = p->getFuture();
213
214   setCallback_(
215     [p, funcm](Try<T>&& t) mutable {
216       try {
217         auto f2 = (*funcm)(std::move(t));
218         // that didn't throw, now we can steal p
219         f2.setCallback_([p](Try<B>&& b) mutable {
220             p->fulfilTry(std::move(b));
221           });
222       } catch (...) {
223         p->setException(std::current_exception());
224       }
225     });
226
227   return std::move(f);
228 }
229
230 // Variant: f.then([](T&& t){ return makeFuture<T>(t); });
231 template <class T>
232 template <class F>
233 typename std::enable_if<
234   !std::is_same<T, void>::value &&
235   isFuture<typename std::result_of<
236     F(typename detail::AliasIfVoid<T>::type&&)>::type>::value,
237   Future<typename std::result_of<
238     F(typename detail::AliasIfVoid<T>::type&&)>::type::value_type> >::type
239 Future<T>::then(F&& func) {
240   typedef typename std::result_of<F(T&&)>::type::value_type B;
241
242   throwIfInvalid();
243
244   folly::MoveWrapper<Promise<B>> p;
245   folly::MoveWrapper<F> funcm(std::forward<F>(func));
246   auto f = p->getFuture();
247
248   setCallback_(
249     [p, funcm](Try<T>&& t) mutable {
250       if (t.hasException()) {
251         p->setException(t.getException());
252       } else {
253         try {
254           auto f2 = (*funcm)(std::move(t.value()));
255           f2.setCallback_([p](Try<B>&& b) mutable {
256               p->fulfilTry(std::move(b));
257             });
258         } catch (...) {
259           p->setException(std::current_exception());
260         }
261       }
262     });
263
264   return std::move(f);
265 }
266
267 // Variant: f.then([](){ return makeFuture(); });
268 template <class T>
269 template <class F>
270 typename std::enable_if<
271   std::is_same<T, void>::value &&
272   isFuture<typename std::result_of<F()>::type>::value,
273   Future<typename std::result_of<F()>::type::value_type> >::type
274 Future<T>::then(F&& func) {
275   typedef typename std::result_of<F()>::type::value_type B;
276
277   throwIfInvalid();
278
279   folly::MoveWrapper<Promise<B>> p;
280   folly::MoveWrapper<F> funcm(std::forward<F>(func));
281
282   auto f = p->getFuture();
283
284   setCallback_(
285     [p, funcm](Try<T>&& t) mutable {
286       if (t.hasException()) {
287         p->setException(t.getException());
288       } else {
289         try {
290           auto f2 = (*funcm)();
291           f2.setCallback_([p](Try<B>&& b) mutable {
292               p->fulfilTry(std::move(b));
293             });
294         } catch (...) {
295           p->setException(std::current_exception());
296         }
297       }
298     });
299
300   return std::move(f);
301 }
302
303 template <class T>
304 Future<void> Future<T>::then() {
305   return then([] (Try<T>&& t) {});
306 }
307
308 template <class T>
309 typename std::add_lvalue_reference<T>::type Future<T>::value() {
310   throwIfInvalid();
311
312   return core_->getTry().value();
313 }
314
315 template <class T>
316 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
317   throwIfInvalid();
318
319   return core_->getTry().value();
320 }
321
322 template <class T>
323 Try<T>& Future<T>::getTry() {
324   throwIfInvalid();
325
326   return core_->getTry();
327 }
328
329 template <class T>
330 template <typename Executor>
331 inline Future<T> Future<T>::via(Executor* executor) && {
332   throwIfInvalid();
333
334   this->deactivate();
335   core_->setExecutor(executor);
336
337   return std::move(*this);
338 }
339
340 template <class T>
341 template <typename Executor>
342 inline Future<T> Future<T>::via(Executor* executor) & {
343   throwIfInvalid();
344
345   MoveWrapper<Promise<T>> p;
346   auto f = p->getFuture();
347   then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
348   return std::move(f).via(executor);
349 }
350
351 template <class T>
352 bool Future<T>::isReady() const {
353   throwIfInvalid();
354   return core_->ready();
355 }
356
357 template <class T>
358 void Future<T>::raise(std::exception_ptr exception) {
359   core_->raise(exception);
360 }
361
362 // makeFuture
363
364 template <class T>
365 Future<typename std::decay<T>::type> makeFuture(T&& t) {
366   Promise<typename std::decay<T>::type> p;
367   auto f = p.getFuture();
368   p.setValue(std::forward<T>(t));
369   return std::move(f);
370 }
371
372 inline // for multiple translation units
373 Future<void> makeFuture() {
374   Promise<void> p;
375   auto f = p.getFuture();
376   p.setValue();
377   return std::move(f);
378 }
379
380 template <class F>
381 auto makeFutureTry(
382     F&& func,
383     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
384     -> Future<decltype(func())> {
385   Promise<decltype(func())> p;
386   auto f = p.getFuture();
387   p.fulfil(
388     [&func]() {
389       return (func)();
390     });
391   return std::move(f);
392 }
393
394 template <class F>
395 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
396   F copy = func;
397   return makeFutureTry(std::move(copy));
398 }
399
400 template <class T>
401 Future<T> makeFuture(std::exception_ptr const& e) {
402   Promise<T> p;
403   auto f = p.getFuture();
404   p.setException(e);
405   return std::move(f);
406 }
407
408 template <class T, class E>
409 typename std::enable_if<std::is_base_of<std::exception, E>::value,
410                         Future<T>>::type
411 makeFuture(E const& e) {
412   Promise<T> p;
413   auto f = p.getFuture();
414   p.fulfil([&]() -> T { throw e; });
415   return std::move(f);
416 }
417
418 template <class T>
419 Future<T> makeFuture(Try<T>&& t) {
420   try {
421     return makeFuture<T>(std::move(t.value()));
422   } catch (...) {
423     return makeFuture<T>(std::current_exception());
424   }
425 }
426
427 template <>
428 inline Future<void> makeFuture(Try<void>&& t) {
429   try {
430     t.throwIfFailed();
431     return makeFuture();
432   } catch (...) {
433     return makeFuture<void>(std::current_exception());
434   }
435 }
436
437 // via
438 template <typename Executor>
439 Future<void> via(Executor* executor) {
440   return makeFuture().via(executor);
441 }
442
443 // when (variadic)
444
445 template <typename... Fs>
446 typename detail::VariadicContext<
447   typename std::decay<Fs>::type::value_type...>::type
448 whenAll(Fs&&... fs)
449 {
450   auto ctx =
451     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
452   ctx->total = sizeof...(fs);
453   auto f_saved = ctx->p.getFuture();
454   detail::whenAllVariadicHelper(ctx,
455     std::forward<typename std::decay<Fs>::type>(fs)...);
456   return std::move(f_saved);
457 }
458
459 // when (iterator)
460
461 template <class InputIterator>
462 Future<
463   std::vector<
464   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
465 whenAll(InputIterator first, InputIterator last)
466 {
467   typedef
468     typename std::iterator_traits<InputIterator>::value_type::value_type T;
469
470   auto n = std::distance(first, last);
471   if (n == 0) {
472     return makeFuture(std::vector<Try<T>>());
473   }
474
475   auto ctx = new detail::WhenAllContext<T>();
476
477   ctx->results.resize(n);
478
479   auto f_saved = ctx->p.getFuture();
480
481   for (size_t i = 0; first != last; ++first, ++i) {
482      assert(i < n);
483      auto& f = *first;
484      f.setCallback_([ctx, i, n](Try<T>&& t) {
485          ctx->results[i] = std::move(t);
486          if (++ctx->count == n) {
487            ctx->p.setValue(std::move(ctx->results));
488            delete ctx;
489          }
490        });
491   }
492
493   return std::move(f_saved);
494 }
495
496 template <class InputIterator>
497 Future<
498   std::pair<size_t,
499             Try<
500               typename
501               std::iterator_traits<InputIterator>::value_type::value_type> > >
502 whenAny(InputIterator first, InputIterator last) {
503   typedef
504     typename std::iterator_traits<InputIterator>::value_type::value_type T;
505
506   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
507   auto f_saved = ctx->p.getFuture();
508
509   for (size_t i = 0; first != last; first++, i++) {
510     auto& f = *first;
511     f.setCallback_([i, ctx](Try<T>&& t) {
512       if (!ctx->done.exchange(true)) {
513         ctx->p.setValue(std::make_pair(i, std::move(t)));
514       }
515       ctx->decref();
516     });
517   }
518
519   return std::move(f_saved);
520 }
521
522 template <class InputIterator>
523 Future<std::vector<std::pair<size_t, Try<typename
524   std::iterator_traits<InputIterator>::value_type::value_type>>>>
525 whenN(InputIterator first, InputIterator last, size_t n) {
526   typedef typename
527     std::iterator_traits<InputIterator>::value_type::value_type T;
528   typedef std::vector<std::pair<size_t, Try<T>>> V;
529
530   struct ctx_t {
531     V v;
532     size_t completed;
533     Promise<V> p;
534   };
535   auto ctx = std::make_shared<ctx_t>();
536   ctx->completed = 0;
537
538   // for each completed Future, increase count and add to vector, until we
539   // have n completed futures at which point we fulfil our Promise with the
540   // vector
541   auto it = first;
542   size_t i = 0;
543   while (it != last) {
544     it->then([ctx, n, i](Try<T>&& t) {
545       auto& v = ctx->v;
546       auto c = ++ctx->completed;
547       if (c <= n) {
548         assert(ctx->v.size() < n);
549         v.push_back(std::make_pair(i, std::move(t)));
550         if (c == n) {
551           ctx->p.fulfilTry(Try<V>(std::move(v)));
552         }
553       }
554     });
555
556     it++;
557     i++;
558   }
559
560   if (i < n) {
561     ctx->p.setException(std::runtime_error("Not enough futures"));
562   }
563
564   return ctx->p.getFuture();
565 }
566
567 template <typename T>
568 Future<T>
569 waitWithSemaphore(Future<T>&& f) {
570   Baton<> baton;
571   auto done = f.then([&](Try<T> &&t) {
572     baton.post();
573     return std::move(t.value());
574   });
575   baton.wait();
576   while (!done.isReady()) {
577     // There's a race here between the return here and the actual finishing of
578     // the future. f is completed, but the setup may not have finished on done
579     // after the baton has posted.
580     std::this_thread::yield();
581   }
582   return done;
583 }
584
585 template<>
586 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
587   Baton<> baton;
588   auto done = f.then([&](Try<void> &&t) {
589     baton.post();
590     t.value();
591   });
592   baton.wait();
593   while (!done.isReady()) {
594     // There's a race here between the return here and the actual finishing of
595     // the future. f is completed, but the setup may not have finished on done
596     // after the baton has posted.
597     std::this_thread::yield();
598   }
599   return done;
600 }
601
602 template <typename T, class Duration>
603 Future<T>
604 waitWithSemaphore(Future<T>&& f, Duration timeout) {
605   auto baton = std::make_shared<Baton<>>();
606   auto done = f.then([baton](Try<T> &&t) {
607     baton->post();
608     return std::move(t.value());
609   });
610   baton->timed_wait(std::chrono::system_clock::now() + timeout);
611   return done;
612 }
613
614 template <class Duration>
615 Future<void>
616 waitWithSemaphore(Future<void>&& f, Duration timeout) {
617   auto baton = std::make_shared<Baton<>>();
618   auto done = f.then([baton](Try<void> &&t) {
619     baton->post();
620     t.value();
621   });
622   baton->timed_wait(std::chrono::system_clock::now() + timeout);
623   return done;
624 }
625
626 }}
627
628 // I haven't included a Future<T&> specialization because I don't forsee us
629 // using it, however it is not difficult to add when needed. Refer to
630 // Future<void> for guidance. std::future and boost::future code would also be
631 // instructive.