a7183180dbc9bc387a9ce19ec5b78e2120c5aa95
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
25
26 namespace folly {
27
28 class Timekeeper;
29
30 namespace detail {
31   Timekeeper* getTimekeeperSingleton();
32 }
33
34 template <class T>
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36   *this = std::move(other);
37 }
38
39 template <class T>
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41   std::swap(core_, other.core_);
42   return *this;
43 }
44
45 template <class T>
46 template <class F>
47 Future<T>::Future(
48   const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
49     : core_(nullptr) {
50   Promise<F> p;
51   p.setValue(val);
52   *this = p.getFuture();
53 }
54
55 template <class T>
56 template <class F>
57 Future<T>::Future(
58   typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
59     : core_(nullptr) {
60   Promise<F> p;
61   p.setValue(std::forward<F>(val));
62   *this = p.getFuture();
63 }
64
65 template <>
66 template <class F,
67           typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
69   Promise<void> p;
70   p.setValue();
71   *this = p.getFuture();
72 }
73
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::move(func));
99 }
100
101 // Variant: returns a value
102 // e.g. f.then([](Try<T>&& t){ return t.value(); });
103 template <class T>
104 template <typename F, typename R, bool isTry, typename... Args>
105 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
106 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
107   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
108   typedef typename R::ReturnsFuture::Inner B;
109
110   throwIfInvalid();
111
112   // wrap these so we can move them into the lambda
113   folly::MoveWrapper<Promise<B>> p;
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   if (getExecutor()) {
119     f.setExecutor(getExecutor());
120   }
121
122   /* This is a bit tricky.
123
124      We can't just close over *this in case this Future gets moved. So we
125      make a new dummy Future. We could figure out something more
126      sophisticated that avoids making a new Future object when it can, as an
127      optimization. But this is correct.
128
129      core_ can't be moved, it is explicitly disallowed (as is copying). But
130      if there's ever a reason to allow it, this is one place that makes that
131      assumption and would need to be fixed. We use a standard shared pointer
132      for core_ (by copying it in), which means in essence obj holds a shared
133      pointer to itself.  But this shouldn't leak because Promise will not
134      outlive the continuation, because Promise will setException() with a
135      broken Promise if it is destructed before completed. We could use a
136      weak pointer but it would have to be converted to a shared pointer when
137      func is executed (because the Future returned by func may possibly
138      persist beyond the callback, if it gets moved), and so it is an
139      optimization to just make it shared from the get-go.
140
141      We have to move in the Promise and func using the MoveWrapper
142      hack. (func could be copied but it's a big drag on perf).
143
144      Two subtle but important points about this design. detail::Core has no
145      back pointers to Future or Promise, so if Future or Promise get moved
146      (and they will be moved in performant code) we don't have to do
147      anything fancy. And because we store the continuation in the
148      detail::Core, not in the Future, we can execute the continuation even
149      after the Future has gone out of scope. This is an intentional design
150      decision. It is likely we will want to be able to cancel a continuation
151      in some circumstances, but I think it should be explicit not implicit
152      in the destruction of the Future used to create it.
153      */
154   setCallback_(
155     [p, funcm](Try<T>&& t) mutable {
156       if (!isTry && t.hasException()) {
157         p->setException(std::move(t.exception()));
158       } else {
159         p->fulfil([&]() {
160           return (*funcm)(t.template get<isTry, Args>()...);
161         });
162       }
163     });
164
165   return f;
166 }
167
168 // Variant: returns a Future
169 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
170 template <class T>
171 template <typename F, typename R, bool isTry, typename... Args>
172 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
173 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
174   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
175   typedef typename R::ReturnsFuture::Inner B;
176
177   throwIfInvalid();
178
179   // wrap these so we can move them into the lambda
180   folly::MoveWrapper<Promise<B>> p;
181   folly::MoveWrapper<F> funcm(std::forward<F>(func));
182
183   // grab the Future now before we lose our handle on the Promise
184   auto f = p->getFuture();
185   if (getExecutor()) {
186     f.setExecutor(getExecutor());
187   }
188
189   setCallback_(
190     [p, funcm](Try<T>&& t) mutable {
191       if (!isTry && t.hasException()) {
192         p->setException(std::move(t.exception()));
193       } else {
194         try {
195           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
196           // that didn't throw, now we can steal p
197           f2.setCallback_([p](Try<B>&& b) mutable {
198             p->fulfilTry(std::move(b));
199           });
200         } catch (const std::exception& e) {
201           p->setException(exception_wrapper(std::current_exception(), e));
202         } catch (...) {
203           p->setException(exception_wrapper(std::current_exception()));
204         }
205       }
206     });
207
208   return f;
209 }
210
211 template <typename T>
212 template <typename R, typename Caller, typename... Args>
213   Future<typename isFuture<R>::Inner>
214 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
215   typedef typename std::remove_cv<
216     typename std::remove_reference<
217       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
218   return then([instance, func](Try<T>&& t){
219     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
220   });
221 }
222
223 template <class T>
224 Future<void> Future<T>::then() {
225   return then([] (Try<T>&& t) {});
226 }
227
228 // onError where the callback returns T
229 template <class T>
230 template <class F>
231 typename std::enable_if<
232   !detail::Extract<F>::ReturnsFuture::value,
233   Future<T>>::type
234 Future<T>::onError(F&& func) {
235   typedef typename detail::Extract<F>::FirstArg Exn;
236   static_assert(
237       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
238       "Return type of onError callback must be T or Future<T>");
239
240   Promise<T> p;
241   auto f = p.getFuture();
242   auto pm = folly::makeMoveWrapper(std::move(p));
243   auto funcm = folly::makeMoveWrapper(std::move(func));
244   setCallback_([pm, funcm](Try<T>&& t) mutable {
245     if (!t.template withException<Exn>([&] (Exn& e) {
246           pm->fulfil([&]{
247             return (*funcm)(e);
248           });
249         })) {
250       pm->fulfilTry(std::move(t));
251     }
252   });
253
254   return f;
255 }
256
257 // onError where the callback returns Future<T>
258 template <class T>
259 template <class F>
260 typename std::enable_if<
261   detail::Extract<F>::ReturnsFuture::value,
262   Future<T>>::type
263 Future<T>::onError(F&& func) {
264   static_assert(
265       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
266       "Return type of onError callback must be T or Future<T>");
267   typedef typename detail::Extract<F>::FirstArg Exn;
268
269   Promise<T> p;
270   auto f = p.getFuture();
271   auto pm = folly::makeMoveWrapper(std::move(p));
272   auto funcm = folly::makeMoveWrapper(std::move(func));
273   setCallback_([pm, funcm](Try<T>&& t) mutable {
274     if (!t.template withException<Exn>([&] (Exn& e) {
275           try {
276             auto f2 = (*funcm)(e);
277             f2.setCallback_([pm](Try<T>&& t2) mutable {
278               pm->fulfilTry(std::move(t2));
279             });
280           } catch (const std::exception& e2) {
281             pm->setException(exception_wrapper(std::current_exception(), e2));
282           } catch (...) {
283             pm->setException(exception_wrapper(std::current_exception()));
284           }
285         })) {
286       pm->fulfilTry(std::move(t));
287     }
288   });
289
290   return f;
291 }
292
293 template <class T>
294 template <class F>
295 Future<T> Future<T>::ensure(F func) {
296   MoveWrapper<F> funcw(std::move(func));
297   return this->then([funcw](Try<T>&& t) {
298     (*funcw)();
299     return makeFuture(std::move(t));
300   });
301 }
302
303 template <class T>
304 template <class F>
305 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
306   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
307   return within(dur, tk)
308     .onError([funcw](TimedOut const&) { return (*funcw)(); });
309 }
310
311 template <class T>
312 typename std::add_lvalue_reference<T>::type Future<T>::value() {
313   throwIfInvalid();
314
315   return core_->getTry().value();
316 }
317
318 template <class T>
319 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
320   throwIfInvalid();
321
322   return core_->getTry().value();
323 }
324
325 template <class T>
326 Try<T>& Future<T>::getTry() {
327   throwIfInvalid();
328
329   return core_->getTry();
330 }
331
332 template <class T>
333 template <typename Executor>
334 inline Future<T> Future<T>::via(Executor* executor) && {
335   throwIfInvalid();
336
337   setExecutor(executor);
338
339   return std::move(*this);
340 }
341
342 template <class T>
343 template <typename Executor>
344 inline Future<T> Future<T>::via(Executor* executor) & {
345   throwIfInvalid();
346
347   MoveWrapper<Promise<T>> p;
348   auto f = p->getFuture();
349   then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
350   return std::move(f).via(executor);
351 }
352
353 template <class T>
354 bool Future<T>::isReady() const {
355   throwIfInvalid();
356   return core_->ready();
357 }
358
359 template <class T>
360 void Future<T>::raise(exception_wrapper exception) {
361   core_->raise(std::move(exception));
362 }
363
364 // makeFuture
365
366 template <class T>
367 Future<typename std::decay<T>::type> makeFuture(T&& t) {
368   Promise<typename std::decay<T>::type> p;
369   p.setValue(std::forward<T>(t));
370   return p.getFuture();
371 }
372
373 inline // for multiple translation units
374 Future<void> makeFuture() {
375   Promise<void> p;
376   p.setValue();
377   return p.getFuture();
378 }
379
380 template <class F>
381 auto makeFutureTry(
382     F&& func,
383     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
384     -> Future<decltype(func())> {
385   Promise<decltype(func())> p;
386   p.fulfil(
387     [&func]() {
388       return (func)();
389     });
390   return p.getFuture();
391 }
392
393 template <class F>
394 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
395   F copy = func;
396   return makeFutureTry(std::move(copy));
397 }
398
399 template <class T>
400 Future<T> makeFuture(std::exception_ptr const& e) {
401   Promise<T> p;
402   p.setException(e);
403   return p.getFuture();
404 }
405
406 template <class T>
407 Future<T> makeFuture(exception_wrapper ew) {
408   Promise<T> p;
409   p.setException(std::move(ew));
410   return p.getFuture();
411 }
412
413 template <class T, class E>
414 typename std::enable_if<std::is_base_of<std::exception, E>::value,
415                         Future<T>>::type
416 makeFuture(E const& e) {
417   Promise<T> p;
418   p.setException(make_exception_wrapper<E>(e));
419   return p.getFuture();
420 }
421
422 template <class T>
423 Future<T> makeFuture(Try<T>&& t) {
424   Promise<typename std::decay<T>::type> p;
425   p.fulfilTry(std::move(t));
426   return p.getFuture();
427 }
428
429 template <>
430 inline Future<void> makeFuture(Try<void>&& t) {
431   if (t.hasException()) {
432     return makeFuture<void>(std::move(t.exception()));
433   } else {
434     return makeFuture();
435   }
436 }
437
438 // via
439 template <typename Executor>
440 Future<void> via(Executor* executor) {
441   return makeFuture().via(executor);
442 }
443
444 // when (variadic)
445
446 template <typename... Fs>
447 typename detail::VariadicContext<
448   typename std::decay<Fs>::type::value_type...>::type
449 whenAll(Fs&&... fs)
450 {
451   auto ctx =
452     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
453   ctx->total = sizeof...(fs);
454   auto f_saved = ctx->p.getFuture();
455   detail::whenAllVariadicHelper(ctx,
456     std::forward<typename std::decay<Fs>::type>(fs)...);
457   return f_saved;
458 }
459
460 // when (iterator)
461
462 template <class InputIterator>
463 Future<
464   std::vector<
465   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
466 whenAll(InputIterator first, InputIterator last)
467 {
468   typedef
469     typename std::iterator_traits<InputIterator>::value_type::value_type T;
470
471   if (first >= last) {
472     return makeFuture(std::vector<Try<T>>());
473   }
474   size_t n = std::distance(first, last);
475
476   auto ctx = new detail::WhenAllContext<T>();
477
478   ctx->results.resize(n);
479
480   auto f_saved = ctx->p.getFuture();
481
482   for (size_t i = 0; first != last; ++first, ++i) {
483      assert(i < n);
484      auto& f = *first;
485      f.setCallback_([ctx, i, n](Try<T>&& t) {
486          ctx->results[i] = std::move(t);
487          if (++ctx->count == n) {
488            ctx->p.setValue(std::move(ctx->results));
489            delete ctx;
490          }
491        });
492   }
493
494   return f_saved;
495 }
496
497 template <class InputIterator>
498 Future<
499   std::pair<size_t,
500             Try<
501               typename
502               std::iterator_traits<InputIterator>::value_type::value_type> > >
503 whenAny(InputIterator first, InputIterator last) {
504   typedef
505     typename std::iterator_traits<InputIterator>::value_type::value_type T;
506
507   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
508   auto f_saved = ctx->p.getFuture();
509
510   for (size_t i = 0; first != last; first++, i++) {
511     auto& f = *first;
512     f.setCallback_([i, ctx](Try<T>&& t) {
513       if (!ctx->done.exchange(true)) {
514         ctx->p.setValue(std::make_pair(i, std::move(t)));
515       }
516       ctx->decref();
517     });
518   }
519
520   return f_saved;
521 }
522
523 template <class InputIterator>
524 Future<std::vector<std::pair<size_t, Try<typename
525   std::iterator_traits<InputIterator>::value_type::value_type>>>>
526 whenN(InputIterator first, InputIterator last, size_t n) {
527   typedef typename
528     std::iterator_traits<InputIterator>::value_type::value_type T;
529   typedef std::vector<std::pair<size_t, Try<T>>> V;
530
531   struct ctx_t {
532     V v;
533     size_t completed;
534     Promise<V> p;
535   };
536   auto ctx = std::make_shared<ctx_t>();
537   ctx->completed = 0;
538
539   // for each completed Future, increase count and add to vector, until we
540   // have n completed futures at which point we fulfil our Promise with the
541   // vector
542   auto it = first;
543   size_t i = 0;
544   while (it != last) {
545     it->then([ctx, n, i](Try<T>&& t) {
546       auto& v = ctx->v;
547       auto c = ++ctx->completed;
548       if (c <= n) {
549         assert(ctx->v.size() < n);
550         v.push_back(std::make_pair(i, std::move(t)));
551         if (c == n) {
552           ctx->p.fulfilTry(Try<V>(std::move(v)));
553         }
554       }
555     });
556
557     it++;
558     i++;
559   }
560
561   if (i < n) {
562     ctx->p.setException(std::runtime_error("Not enough futures"));
563   }
564
565   return ctx->p.getFuture();
566 }
567
568 template <class T>
569 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
570   return within(dur, TimedOut(), tk);
571 }
572
573 template <class T>
574 template <class E>
575 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
576
577   struct Context {
578     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
579     E exception;
580     Promise<T> promise;
581     std::atomic<bool> token;
582   };
583   auto ctx = std::make_shared<Context>(std::move(e));
584
585   if (!tk) {
586     tk = folly::detail::getTimekeeperSingleton();
587   }
588
589   tk->after(dur)
590     .then([ctx](Try<void> const& t) {
591       if (ctx->token.exchange(true) == false) {
592         if (t.hasException()) {
593           ctx->promise.setException(std::move(t.exception()));
594         } else {
595           ctx->promise.setException(std::move(ctx->exception));
596         }
597       }
598     });
599
600   this->then([ctx](Try<T>&& t) {
601     if (ctx->token.exchange(true) == false) {
602       ctx->promise.fulfilTry(std::move(t));
603     }
604   });
605
606   return ctx->promise.getFuture();
607 }
608
609 template <class T>
610 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
611   return whenAll(*this, futures::sleep(dur, tk))
612     .then([](std::tuple<Try<T>, Try<void>> tup) {
613       Try<T>& t = std::get<0>(tup);
614       return makeFuture<T>(std::move(t));
615     });
616 }
617
618 namespace detail {
619
620 template <class T>
621 void waitImpl(Future<T>& f) {
622   // short-circuit if there's nothing to do
623   if (f.isReady()) return;
624
625   Baton<> baton;
626   f = f.then([&](Try<T> t) {
627     baton.post();
628     return makeFuture(std::move(t));
629   });
630   baton.wait();
631
632   // There's a race here between the return here and the actual finishing of
633   // the future. f is completed, but the setup may not have finished on done
634   // after the baton has posted.
635   while (!f.isReady()) {
636     std::this_thread::yield();
637   }
638 }
639
640 template <class T>
641 void waitImpl(Future<T>& f, Duration dur) {
642   // short-circuit if there's nothing to do
643   if (f.isReady()) return;
644
645   auto baton = std::make_shared<Baton<>>();
646   f = f.then([baton](Try<T> t) {
647     baton->post();
648     return makeFuture(std::move(t));
649   });
650
651   // Let's preserve the invariant that if we did not timeout (timed_wait returns
652   // true), then the returned Future is complete when it is returned to the
653   // caller. We need to wait out the race for that Future to complete.
654   if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
655     while (!f.isReady()) {
656       std::this_thread::yield();
657     }
658   }
659 }
660
661 template <class T>
662 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
663   while (!f.isReady()) {
664     e->drive();
665   }
666 }
667
668 } // detail
669
670 template <class T>
671 Future<T>& Future<T>::wait() & {
672   detail::waitImpl(*this);
673   return *this;
674 }
675
676 template <class T>
677 Future<T>&& Future<T>::wait() && {
678   detail::waitImpl(*this);
679   return std::move(*this);
680 }
681
682 template <class T>
683 Future<T>& Future<T>::wait(Duration dur) & {
684   detail::waitImpl(*this, dur);
685   return *this;
686 }
687
688 template <class T>
689 Future<T>&& Future<T>::wait(Duration dur) && {
690   detail::waitImpl(*this, dur);
691   return std::move(*this);
692 }
693
694 template <class T>
695 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
696   detail::waitViaImpl(*this, e);
697   return *this;
698 }
699
700 template <class T>
701 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
702   detail::waitViaImpl(*this, e);
703   return std::move(*this);
704 }
705
706 template <class T>
707 T Future<T>::get() {
708   return std::move(wait().value());
709 }
710
711 template <>
712 inline void Future<void>::get() {
713   wait().value();
714 }
715
716 template <class T>
717 T Future<T>::get(Duration dur) {
718   wait(dur);
719   if (isReady()) {
720     return std::move(value());
721   } else {
722     throw TimedOut();
723   }
724 }
725
726 template <>
727 inline void Future<void>::get(Duration dur) {
728   wait(dur);
729   if (isReady()) {
730     return;
731   } else {
732     throw TimedOut();
733   }
734 }
735
736 template <class T>
737 T Future<T>::getVia(DrivableExecutor* e) {
738   return std::move(waitVia(e).value());
739 }
740
741 template <>
742 inline void Future<void>::getVia(DrivableExecutor* e) {
743   waitVia(e).value();
744 }
745
746 namespace futures {
747
748   namespace {
749     template <class Z>
750     Future<Z> chainHelper(Future<Z> f) {
751       return f;
752     }
753
754     template <class Z, class F, class Fn, class... Callbacks>
755     Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
756       return chainHelper<Z>(f.then(fn), fns...);
757     }
758   }
759
760   template <class A, class Z, class... Callbacks>
761   std::function<Future<Z>(Try<A>)>
762   chain(Callbacks... fns) {
763     MoveWrapper<Promise<A>> pw;
764     MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
765     return [=](Try<A> t) mutable {
766       pw->fulfilTry(std::move(t));
767       return std::move(*fw);
768     };
769   }
770
771 }
772
773 } // namespace folly
774
775 // I haven't included a Future<T&> specialization because I don't forsee us
776 // using it, however it is not difficult to add when needed. Refer to
777 // Future<void> for guidance. std::future and boost::future code would also be
778 // instructive.