Creating a flatten() method to get the inner Future<T> value out of a Future<Future...
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
25
26 namespace folly {
27
28 class Timekeeper;
29
30 namespace detail {
31   Timekeeper* getTimekeeperSingleton();
32 }
33
34 template <class T>
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36   *this = std::move(other);
37 }
38
39 template <class T>
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41   std::swap(core_, other.core_);
42   return *this;
43 }
44
45 template <class T>
46 template <class F>
47 Future<T>::Future(
48   const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
49     : core_(nullptr) {
50   Promise<F> p;
51   p.setValue(val);
52   *this = p.getFuture();
53 }
54
55 template <class T>
56 template <class F>
57 Future<T>::Future(
58   typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
59     : core_(nullptr) {
60   Promise<F> p;
61   p.setValue(std::forward<F>(val));
62   *this = p.getFuture();
63 }
64
65 template <>
66 template <class F,
67           typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
69   Promise<void> p;
70   p.setValue();
71   *this = p.getFuture();
72 }
73
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::move(func));
99 }
100
101 // unwrap
102
103 template <class T>
104 template <class F>
105 typename std::enable_if<isFuture<F>::value,
106                         Future<typename isFuture<T>::Inner>>::type
107 Future<T>::unwrap() {
108   return then([](Future<typename isFuture<T>::Inner> internal_future) {
109       return internal_future;
110   });
111 }
112
113 // then
114
115 // Variant: returns a value
116 // e.g. f.then([](Try<T>&& t){ return t.value(); });
117 template <class T>
118 template <typename F, typename R, bool isTry, typename... Args>
119 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
120 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
121   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
122   typedef typename R::ReturnsFuture::Inner B;
123
124   throwIfInvalid();
125
126   // wrap these so we can move them into the lambda
127   folly::MoveWrapper<Promise<B>> p;
128   folly::MoveWrapper<F> funcm(std::forward<F>(func));
129
130   // grab the Future now before we lose our handle on the Promise
131   auto f = p->getFuture();
132   if (getExecutor()) {
133     f.setExecutor(getExecutor());
134   }
135
136   /* This is a bit tricky.
137
138      We can't just close over *this in case this Future gets moved. So we
139      make a new dummy Future. We could figure out something more
140      sophisticated that avoids making a new Future object when it can, as an
141      optimization. But this is correct.
142
143      core_ can't be moved, it is explicitly disallowed (as is copying). But
144      if there's ever a reason to allow it, this is one place that makes that
145      assumption and would need to be fixed. We use a standard shared pointer
146      for core_ (by copying it in), which means in essence obj holds a shared
147      pointer to itself.  But this shouldn't leak because Promise will not
148      outlive the continuation, because Promise will setException() with a
149      broken Promise if it is destructed before completed. We could use a
150      weak pointer but it would have to be converted to a shared pointer when
151      func is executed (because the Future returned by func may possibly
152      persist beyond the callback, if it gets moved), and so it is an
153      optimization to just make it shared from the get-go.
154
155      We have to move in the Promise and func using the MoveWrapper
156      hack. (func could be copied but it's a big drag on perf).
157
158      Two subtle but important points about this design. detail::Core has no
159      back pointers to Future or Promise, so if Future or Promise get moved
160      (and they will be moved in performant code) we don't have to do
161      anything fancy. And because we store the continuation in the
162      detail::Core, not in the Future, we can execute the continuation even
163      after the Future has gone out of scope. This is an intentional design
164      decision. It is likely we will want to be able to cancel a continuation
165      in some circumstances, but I think it should be explicit not implicit
166      in the destruction of the Future used to create it.
167      */
168   setCallback_(
169     [p, funcm](Try<T>&& t) mutable {
170       if (!isTry && t.hasException()) {
171         p->setException(std::move(t.exception()));
172       } else {
173         p->fulfil([&]() {
174           return (*funcm)(t.template get<isTry, Args>()...);
175         });
176       }
177     });
178
179   return f;
180 }
181
182 // Variant: returns a Future
183 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
184 template <class T>
185 template <typename F, typename R, bool isTry, typename... Args>
186 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
187 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
188   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
189   typedef typename R::ReturnsFuture::Inner B;
190
191   throwIfInvalid();
192
193   // wrap these so we can move them into the lambda
194   folly::MoveWrapper<Promise<B>> p;
195   folly::MoveWrapper<F> funcm(std::forward<F>(func));
196
197   // grab the Future now before we lose our handle on the Promise
198   auto f = p->getFuture();
199   if (getExecutor()) {
200     f.setExecutor(getExecutor());
201   }
202
203   setCallback_(
204     [p, funcm](Try<T>&& t) mutable {
205       if (!isTry && t.hasException()) {
206         p->setException(std::move(t.exception()));
207       } else {
208         try {
209           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
210           // that didn't throw, now we can steal p
211           f2.setCallback_([p](Try<B>&& b) mutable {
212             p->fulfilTry(std::move(b));
213           });
214         } catch (const std::exception& e) {
215           p->setException(exception_wrapper(std::current_exception(), e));
216         } catch (...) {
217           p->setException(exception_wrapper(std::current_exception()));
218         }
219       }
220     });
221
222   return f;
223 }
224
225 template <typename T>
226 template <typename R, typename Caller, typename... Args>
227   Future<typename isFuture<R>::Inner>
228 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
229   typedef typename std::remove_cv<
230     typename std::remove_reference<
231       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
232   return then([instance, func](Try<T>&& t){
233     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
234   });
235 }
236
237 template <class T>
238 Future<void> Future<T>::then() {
239   return then([] (Try<T>&& t) {});
240 }
241
242 // onError where the callback returns T
243 template <class T>
244 template <class F>
245 typename std::enable_if<
246   !detail::Extract<F>::ReturnsFuture::value,
247   Future<T>>::type
248 Future<T>::onError(F&& func) {
249   typedef typename detail::Extract<F>::FirstArg Exn;
250   static_assert(
251       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
252       "Return type of onError callback must be T or Future<T>");
253
254   Promise<T> p;
255   auto f = p.getFuture();
256   auto pm = folly::makeMoveWrapper(std::move(p));
257   auto funcm = folly::makeMoveWrapper(std::move(func));
258   setCallback_([pm, funcm](Try<T>&& t) mutable {
259     if (!t.template withException<Exn>([&] (Exn& e) {
260           pm->fulfil([&]{
261             return (*funcm)(e);
262           });
263         })) {
264       pm->fulfilTry(std::move(t));
265     }
266   });
267
268   return f;
269 }
270
271 // onError where the callback returns Future<T>
272 template <class T>
273 template <class F>
274 typename std::enable_if<
275   detail::Extract<F>::ReturnsFuture::value,
276   Future<T>>::type
277 Future<T>::onError(F&& func) {
278   static_assert(
279       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
280       "Return type of onError callback must be T or Future<T>");
281   typedef typename detail::Extract<F>::FirstArg Exn;
282
283   Promise<T> p;
284   auto f = p.getFuture();
285   auto pm = folly::makeMoveWrapper(std::move(p));
286   auto funcm = folly::makeMoveWrapper(std::move(func));
287   setCallback_([pm, funcm](Try<T>&& t) mutable {
288     if (!t.template withException<Exn>([&] (Exn& e) {
289           try {
290             auto f2 = (*funcm)(e);
291             f2.setCallback_([pm](Try<T>&& t2) mutable {
292               pm->fulfilTry(std::move(t2));
293             });
294           } catch (const std::exception& e2) {
295             pm->setException(exception_wrapper(std::current_exception(), e2));
296           } catch (...) {
297             pm->setException(exception_wrapper(std::current_exception()));
298           }
299         })) {
300       pm->fulfilTry(std::move(t));
301     }
302   });
303
304   return f;
305 }
306
307 template <class T>
308 template <class F>
309 Future<T> Future<T>::ensure(F func) {
310   MoveWrapper<F> funcw(std::move(func));
311   return this->then([funcw](Try<T>&& t) {
312     (*funcw)();
313     return makeFuture(std::move(t));
314   });
315 }
316
317 template <class T>
318 template <class F>
319 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
320   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
321   return within(dur, tk)
322     .onError([funcw](TimedOut const&) { return (*funcw)(); });
323 }
324
325 template <class T>
326 typename std::add_lvalue_reference<T>::type Future<T>::value() {
327   throwIfInvalid();
328
329   return core_->getTry().value();
330 }
331
332 template <class T>
333 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
334   throwIfInvalid();
335
336   return core_->getTry().value();
337 }
338
339 template <class T>
340 Try<T>& Future<T>::getTry() {
341   throwIfInvalid();
342
343   return core_->getTry();
344 }
345
346 template <class T>
347 template <typename Executor>
348 inline Future<T> Future<T>::via(Executor* executor) && {
349   throwIfInvalid();
350
351   setExecutor(executor);
352
353   return std::move(*this);
354 }
355
356 template <class T>
357 template <typename Executor>
358 inline Future<T> Future<T>::via(Executor* executor) & {
359   throwIfInvalid();
360
361   MoveWrapper<Promise<T>> p;
362   auto f = p->getFuture();
363   then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
364   return std::move(f).via(executor);
365 }
366
367 template <class T>
368 bool Future<T>::isReady() const {
369   throwIfInvalid();
370   return core_->ready();
371 }
372
373 template <class T>
374 void Future<T>::raise(exception_wrapper exception) {
375   core_->raise(std::move(exception));
376 }
377
378 // makeFuture
379
380 template <class T>
381 Future<typename std::decay<T>::type> makeFuture(T&& t) {
382   Promise<typename std::decay<T>::type> p;
383   p.setValue(std::forward<T>(t));
384   return p.getFuture();
385 }
386
387 inline // for multiple translation units
388 Future<void> makeFuture() {
389   Promise<void> p;
390   p.setValue();
391   return p.getFuture();
392 }
393
394 template <class F>
395 auto makeFutureTry(
396     F&& func,
397     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
398     -> Future<decltype(func())> {
399   Promise<decltype(func())> p;
400   p.fulfil(
401     [&func]() {
402       return (func)();
403     });
404   return p.getFuture();
405 }
406
407 template <class F>
408 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
409   F copy = func;
410   return makeFutureTry(std::move(copy));
411 }
412
413 template <class T>
414 Future<T> makeFuture(std::exception_ptr const& e) {
415   Promise<T> p;
416   p.setException(e);
417   return p.getFuture();
418 }
419
420 template <class T>
421 Future<T> makeFuture(exception_wrapper ew) {
422   Promise<T> p;
423   p.setException(std::move(ew));
424   return p.getFuture();
425 }
426
427 template <class T, class E>
428 typename std::enable_if<std::is_base_of<std::exception, E>::value,
429                         Future<T>>::type
430 makeFuture(E const& e) {
431   Promise<T> p;
432   p.setException(make_exception_wrapper<E>(e));
433   return p.getFuture();
434 }
435
436 template <class T>
437 Future<T> makeFuture(Try<T>&& t) {
438   Promise<typename std::decay<T>::type> p;
439   p.fulfilTry(std::move(t));
440   return p.getFuture();
441 }
442
443 template <>
444 inline Future<void> makeFuture(Try<void>&& t) {
445   if (t.hasException()) {
446     return makeFuture<void>(std::move(t.exception()));
447   } else {
448     return makeFuture();
449   }
450 }
451
452 // via
453 template <typename Executor>
454 Future<void> via(Executor* executor) {
455   return makeFuture().via(executor);
456 }
457
458 // when (variadic)
459
460 template <typename... Fs>
461 typename detail::VariadicContext<
462   typename std::decay<Fs>::type::value_type...>::type
463 whenAll(Fs&&... fs)
464 {
465   auto ctx =
466     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
467   ctx->total = sizeof...(fs);
468   auto f_saved = ctx->p.getFuture();
469   detail::whenAllVariadicHelper(ctx,
470     std::forward<typename std::decay<Fs>::type>(fs)...);
471   return f_saved;
472 }
473
474 // when (iterator)
475
476 template <class InputIterator>
477 Future<
478   std::vector<
479   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
480 whenAll(InputIterator first, InputIterator last)
481 {
482   typedef
483     typename std::iterator_traits<InputIterator>::value_type::value_type T;
484
485   if (first >= last) {
486     return makeFuture(std::vector<Try<T>>());
487   }
488   size_t n = std::distance(first, last);
489
490   auto ctx = new detail::WhenAllContext<T>();
491
492   ctx->results.resize(n);
493
494   auto f_saved = ctx->p.getFuture();
495
496   for (size_t i = 0; first != last; ++first, ++i) {
497      assert(i < n);
498      auto& f = *first;
499      f.setCallback_([ctx, i, n](Try<T>&& t) {
500          ctx->results[i] = std::move(t);
501          if (++ctx->count == n) {
502            ctx->p.setValue(std::move(ctx->results));
503            delete ctx;
504          }
505        });
506   }
507
508   return f_saved;
509 }
510
511 template <class InputIterator>
512 Future<
513   std::pair<size_t,
514             Try<
515               typename
516               std::iterator_traits<InputIterator>::value_type::value_type> > >
517 whenAny(InputIterator first, InputIterator last) {
518   typedef
519     typename std::iterator_traits<InputIterator>::value_type::value_type T;
520
521   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
522   auto f_saved = ctx->p.getFuture();
523
524   for (size_t i = 0; first != last; first++, i++) {
525     auto& f = *first;
526     f.setCallback_([i, ctx](Try<T>&& t) {
527       if (!ctx->done.exchange(true)) {
528         ctx->p.setValue(std::make_pair(i, std::move(t)));
529       }
530       ctx->decref();
531     });
532   }
533
534   return f_saved;
535 }
536
537 template <class InputIterator>
538 Future<std::vector<std::pair<size_t, Try<typename
539   std::iterator_traits<InputIterator>::value_type::value_type>>>>
540 whenN(InputIterator first, InputIterator last, size_t n) {
541   typedef typename
542     std::iterator_traits<InputIterator>::value_type::value_type T;
543   typedef std::vector<std::pair<size_t, Try<T>>> V;
544
545   struct ctx_t {
546     V v;
547     size_t completed;
548     Promise<V> p;
549   };
550   auto ctx = std::make_shared<ctx_t>();
551   ctx->completed = 0;
552
553   // for each completed Future, increase count and add to vector, until we
554   // have n completed futures at which point we fulfil our Promise with the
555   // vector
556   auto it = first;
557   size_t i = 0;
558   while (it != last) {
559     it->then([ctx, n, i](Try<T>&& t) {
560       auto& v = ctx->v;
561       auto c = ++ctx->completed;
562       if (c <= n) {
563         assert(ctx->v.size() < n);
564         v.push_back(std::make_pair(i, std::move(t)));
565         if (c == n) {
566           ctx->p.fulfilTry(Try<V>(std::move(v)));
567         }
568       }
569     });
570
571     it++;
572     i++;
573   }
574
575   if (i < n) {
576     ctx->p.setException(std::runtime_error("Not enough futures"));
577   }
578
579   return ctx->p.getFuture();
580 }
581
582 template <class T>
583 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
584   return within(dur, TimedOut(), tk);
585 }
586
587 template <class T>
588 template <class E>
589 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
590
591   struct Context {
592     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
593     E exception;
594     Promise<T> promise;
595     std::atomic<bool> token;
596   };
597   auto ctx = std::make_shared<Context>(std::move(e));
598
599   if (!tk) {
600     tk = folly::detail::getTimekeeperSingleton();
601   }
602
603   tk->after(dur)
604     .then([ctx](Try<void> const& t) {
605       if (ctx->token.exchange(true) == false) {
606         if (t.hasException()) {
607           ctx->promise.setException(std::move(t.exception()));
608         } else {
609           ctx->promise.setException(std::move(ctx->exception));
610         }
611       }
612     });
613
614   this->then([ctx](Try<T>&& t) {
615     if (ctx->token.exchange(true) == false) {
616       ctx->promise.fulfilTry(std::move(t));
617     }
618   });
619
620   return ctx->promise.getFuture();
621 }
622
623 template <class T>
624 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
625   return whenAll(*this, futures::sleep(dur, tk))
626     .then([](std::tuple<Try<T>, Try<void>> tup) {
627       Try<T>& t = std::get<0>(tup);
628       return makeFuture<T>(std::move(t));
629     });
630 }
631
632 namespace detail {
633
634 template <class T>
635 void waitImpl(Future<T>& f) {
636   // short-circuit if there's nothing to do
637   if (f.isReady()) return;
638
639   Baton<> baton;
640   f = f.then([&](Try<T> t) {
641     baton.post();
642     return makeFuture(std::move(t));
643   });
644   baton.wait();
645
646   // There's a race here between the return here and the actual finishing of
647   // the future. f is completed, but the setup may not have finished on done
648   // after the baton has posted.
649   while (!f.isReady()) {
650     std::this_thread::yield();
651   }
652 }
653
654 template <class T>
655 void waitImpl(Future<T>& f, Duration dur) {
656   // short-circuit if there's nothing to do
657   if (f.isReady()) return;
658
659   auto baton = std::make_shared<Baton<>>();
660   f = f.then([baton](Try<T> t) {
661     baton->post();
662     return makeFuture(std::move(t));
663   });
664
665   // Let's preserve the invariant that if we did not timeout (timed_wait returns
666   // true), then the returned Future is complete when it is returned to the
667   // caller. We need to wait out the race for that Future to complete.
668   if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
669     while (!f.isReady()) {
670       std::this_thread::yield();
671     }
672   }
673 }
674
675 template <class T>
676 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
677   while (!f.isReady()) {
678     e->drive();
679   }
680 }
681
682 } // detail
683
684 template <class T>
685 Future<T>& Future<T>::wait() & {
686   detail::waitImpl(*this);
687   return *this;
688 }
689
690 template <class T>
691 Future<T>&& Future<T>::wait() && {
692   detail::waitImpl(*this);
693   return std::move(*this);
694 }
695
696 template <class T>
697 Future<T>& Future<T>::wait(Duration dur) & {
698   detail::waitImpl(*this, dur);
699   return *this;
700 }
701
702 template <class T>
703 Future<T>&& Future<T>::wait(Duration dur) && {
704   detail::waitImpl(*this, dur);
705   return std::move(*this);
706 }
707
708 template <class T>
709 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
710   detail::waitViaImpl(*this, e);
711   return *this;
712 }
713
714 template <class T>
715 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
716   detail::waitViaImpl(*this, e);
717   return std::move(*this);
718 }
719
720 template <class T>
721 T Future<T>::get() {
722   return std::move(wait().value());
723 }
724
725 template <>
726 inline void Future<void>::get() {
727   wait().value();
728 }
729
730 template <class T>
731 T Future<T>::get(Duration dur) {
732   wait(dur);
733   if (isReady()) {
734     return std::move(value());
735   } else {
736     throw TimedOut();
737   }
738 }
739
740 template <>
741 inline void Future<void>::get(Duration dur) {
742   wait(dur);
743   if (isReady()) {
744     return;
745   } else {
746     throw TimedOut();
747   }
748 }
749
750 template <class T>
751 T Future<T>::getVia(DrivableExecutor* e) {
752   return std::move(waitVia(e).value());
753 }
754
755 template <>
756 inline void Future<void>::getVia(DrivableExecutor* e) {
757   waitVia(e).value();
758 }
759
760 template <class T>
761 Future<bool> Future<T>::willEqual(Future<T>& f) {
762   return whenAll(*this, f).then([](const std::tuple<Try<T>, Try<T>>& t) {
763     if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) {
764       return std::get<0>(t).value() == std::get<1>(t).value();
765     } else {
766       return false;
767       }
768   });
769 }
770
771 namespace futures {
772   namespace {
773     template <class Z>
774     Future<Z> chainHelper(Future<Z> f) {
775       return f;
776     }
777
778     template <class Z, class F, class Fn, class... Callbacks>
779     Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
780       return chainHelper<Z>(f.then(fn), fns...);
781     }
782   }
783
784   template <class A, class Z, class... Callbacks>
785   std::function<Future<Z>(Try<A>)>
786   chain(Callbacks... fns) {
787     MoveWrapper<Promise<A>> pw;
788     MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
789     return [=](Try<A> t) mutable {
790       pw->fulfilTry(std::move(t));
791       return std::move(*fw);
792     };
793   }
794
795 }
796
797 } // namespace folly
798
799 // I haven't included a Future<T&> specialization because I don't forsee us
800 // using it, however it is not difficult to add when needed. Refer to
801 // Future<void> for guidance. std::future and boost::future code would also be
802 // instructive.