Future<T>::onTimeout(Duration, function<T()>, Timekeeper*=nullptr)
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
25
26 namespace folly {
27
28 class Timekeeper;
29
30 namespace detail {
31   Timekeeper* getTimekeeperSingleton();
32 }
33
34 template <class T>
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36   *this = std::move(other);
37 }
38
39 template <class T>
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41   std::swap(core_, other.core_);
42   return *this;
43 }
44
45 template <class T>
46 template <class F>
47 Future<T>::Future(
48   const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
49     : core_(nullptr) {
50   Promise<F> p;
51   p.setValue(val);
52   *this = p.getFuture();
53 }
54
55 template <class T>
56 template <class F>
57 Future<T>::Future(
58   typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
59     : core_(nullptr) {
60   Promise<F> p;
61   p.setValue(std::forward<F>(val));
62   *this = p.getFuture();
63 }
64
65 template <>
66 template <class F,
67           typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
69   Promise<void> p;
70   p.setValue();
71   *this = p.getFuture();
72 }
73
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::move(func));
99 }
100
101 // Variant: returns a value
102 // e.g. f.then([](Try<T>&& t){ return t.value(); });
103 template <class T>
104 template <typename F, typename R, bool isTry, typename... Args>
105 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
106 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
107   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
108   typedef typename R::ReturnsFuture::Inner B;
109
110   throwIfInvalid();
111
112   // wrap these so we can move them into the lambda
113   folly::MoveWrapper<Promise<B>> p;
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118
119   /* This is a bit tricky.
120
121      We can't just close over *this in case this Future gets moved. So we
122      make a new dummy Future. We could figure out something more
123      sophisticated that avoids making a new Future object when it can, as an
124      optimization. But this is correct.
125
126      core_ can't be moved, it is explicitly disallowed (as is copying). But
127      if there's ever a reason to allow it, this is one place that makes that
128      assumption and would need to be fixed. We use a standard shared pointer
129      for core_ (by copying it in), which means in essence obj holds a shared
130      pointer to itself.  But this shouldn't leak because Promise will not
131      outlive the continuation, because Promise will setException() with a
132      broken Promise if it is destructed before completed. We could use a
133      weak pointer but it would have to be converted to a shared pointer when
134      func is executed (because the Future returned by func may possibly
135      persist beyond the callback, if it gets moved), and so it is an
136      optimization to just make it shared from the get-go.
137
138      We have to move in the Promise and func using the MoveWrapper
139      hack. (func could be copied but it's a big drag on perf).
140
141      Two subtle but important points about this design. detail::Core has no
142      back pointers to Future or Promise, so if Future or Promise get moved
143      (and they will be moved in performant code) we don't have to do
144      anything fancy. And because we store the continuation in the
145      detail::Core, not in the Future, we can execute the continuation even
146      after the Future has gone out of scope. This is an intentional design
147      decision. It is likely we will want to be able to cancel a continuation
148      in some circumstances, but I think it should be explicit not implicit
149      in the destruction of the Future used to create it.
150      */
151   setCallback_(
152     [p, funcm](Try<T>&& t) mutable {
153       if (!isTry && t.hasException()) {
154         p->setException(std::move(t.exception()));
155       } else {
156         p->fulfil([&]() {
157           return (*funcm)(t.template get<isTry, Args>()...);
158         });
159       }
160     });
161
162   return f;
163 }
164
165 // Variant: returns a Future
166 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
167 template <class T>
168 template <typename F, typename R, bool isTry, typename... Args>
169 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
170 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
171   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
172   typedef typename R::ReturnsFuture::Inner B;
173
174   throwIfInvalid();
175
176   // wrap these so we can move them into the lambda
177   folly::MoveWrapper<Promise<B>> p;
178   folly::MoveWrapper<F> funcm(std::forward<F>(func));
179
180   // grab the Future now before we lose our handle on the Promise
181   auto f = p->getFuture();
182
183   setCallback_(
184     [p, funcm](Try<T>&& t) mutable {
185       if (!isTry && t.hasException()) {
186         p->setException(std::move(t.exception()));
187       } else {
188         try {
189           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
190           // that didn't throw, now we can steal p
191           f2.setCallback_([p](Try<B>&& b) mutable {
192             p->fulfilTry(std::move(b));
193           });
194         } catch (const std::exception& e) {
195           p->setException(exception_wrapper(std::current_exception()));
196         }
197       }
198     });
199
200   return f;
201 }
202
203 template <typename T>
204 template <typename Caller, typename R, typename... Args>
205   Future<typename isFuture<R>::Inner>
206 Future<T>::then(Caller *instance, R(Caller::*func)(Args...)) {
207   typedef typename std::remove_cv<
208     typename std::remove_reference<
209       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
210   return then([instance, func](Try<T>&& t){
211     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
212   });
213 }
214
215 template <class T>
216 Future<void> Future<T>::then() {
217   return then([] (Try<T>&& t) {});
218 }
219
220 // onError where the callback returns T
221 template <class T>
222 template <class F>
223 typename std::enable_if<
224   !detail::Extract<F>::ReturnsFuture::value,
225   Future<T>>::type
226 Future<T>::onError(F&& func) {
227   typedef typename detail::Extract<F>::FirstArg Exn;
228   static_assert(
229       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
230       "Return type of onError callback must be T or Future<T>");
231
232   Promise<T> p;
233   auto f = p.getFuture();
234   auto pm = folly::makeMoveWrapper(std::move(p));
235   auto funcm = folly::makeMoveWrapper(std::move(func));
236   setCallback_([pm, funcm](Try<T>&& t) mutable {
237     if (!t.template withException<Exn>([&] (Exn& e) {
238           pm->fulfil([&]{
239             return (*funcm)(e);
240           });
241         })) {
242       pm->fulfilTry(std::move(t));
243     }
244   });
245
246   return f;
247 }
248
249 // onError where the callback returns Future<T>
250 template <class T>
251 template <class F>
252 typename std::enable_if<
253   detail::Extract<F>::ReturnsFuture::value,
254   Future<T>>::type
255 Future<T>::onError(F&& func) {
256   static_assert(
257       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
258       "Return type of onError callback must be T or Future<T>");
259   typedef typename detail::Extract<F>::FirstArg Exn;
260
261   Promise<T> p;
262   auto f = p.getFuture();
263   auto pm = folly::makeMoveWrapper(std::move(p));
264   auto funcm = folly::makeMoveWrapper(std::move(func));
265   setCallback_([pm, funcm](Try<T>&& t) mutable {
266     if (!t.template withException<Exn>([&] (Exn& e) {
267           try {
268             auto f2 = (*funcm)(e);
269             f2.setCallback_([pm](Try<T>&& t2) mutable {
270               pm->fulfilTry(std::move(t2));
271             });
272           } catch (const std::exception& e2) {
273             pm->setException(exception_wrapper(std::current_exception(), e2));
274           } catch (...) {
275             pm->setException(exception_wrapper(std::current_exception()));
276           }
277         })) {
278       pm->fulfilTry(std::move(t));
279     }
280   });
281
282   return f;
283 }
284
285 template <class T>
286 template <class F>
287 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
288   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
289   return within(dur, tk)
290     .onError([funcw](TimedOut const&) { return (*funcw)(); });
291 }
292
293 template <class T>
294 typename std::add_lvalue_reference<T>::type Future<T>::value() {
295   throwIfInvalid();
296
297   return core_->getTry().value();
298 }
299
300 template <class T>
301 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
302   throwIfInvalid();
303
304   return core_->getTry().value();
305 }
306
307 template <class T>
308 Try<T>& Future<T>::getTry() {
309   throwIfInvalid();
310
311   return core_->getTry();
312 }
313
314 template <class T>
315 template <typename Executor>
316 inline Future<T> Future<T>::via(Executor* executor) && {
317   throwIfInvalid();
318
319   this->deactivate();
320   core_->setExecutor(executor);
321
322   return std::move(*this);
323 }
324
325 template <class T>
326 template <typename Executor>
327 inline Future<T> Future<T>::via(Executor* executor) & {
328   throwIfInvalid();
329
330   MoveWrapper<Promise<T>> p;
331   auto f = p->getFuture();
332   then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
333   return std::move(f).via(executor);
334 }
335
336 template <class T>
337 bool Future<T>::isReady() const {
338   throwIfInvalid();
339   return core_->ready();
340 }
341
342 template <class T>
343 void Future<T>::raise(exception_wrapper exception) {
344   core_->raise(std::move(exception));
345 }
346
347 // makeFuture
348
349 template <class T>
350 Future<typename std::decay<T>::type> makeFuture(T&& t) {
351   Promise<typename std::decay<T>::type> p;
352   p.setValue(std::forward<T>(t));
353   return p.getFuture();
354 }
355
356 inline // for multiple translation units
357 Future<void> makeFuture() {
358   Promise<void> p;
359   p.setValue();
360   return p.getFuture();
361 }
362
363 template <class F>
364 auto makeFutureTry(
365     F&& func,
366     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
367     -> Future<decltype(func())> {
368   Promise<decltype(func())> p;
369   p.fulfil(
370     [&func]() {
371       return (func)();
372     });
373   return p.getFuture();
374 }
375
376 template <class F>
377 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
378   F copy = func;
379   return makeFutureTry(std::move(copy));
380 }
381
382 template <class T>
383 Future<T> makeFuture(std::exception_ptr const& e) {
384   Promise<T> p;
385   p.setException(e);
386   return p.getFuture();
387 }
388
389 template <class T>
390 Future<T> makeFuture(exception_wrapper ew) {
391   Promise<T> p;
392   p.setException(std::move(ew));
393   return p.getFuture();
394 }
395
396 template <class T, class E>
397 typename std::enable_if<std::is_base_of<std::exception, E>::value,
398                         Future<T>>::type
399 makeFuture(E const& e) {
400   Promise<T> p;
401   p.setException(make_exception_wrapper<E>(e));
402   return p.getFuture();
403 }
404
405 template <class T>
406 Future<T> makeFuture(Try<T>&& t) {
407   Promise<typename std::decay<T>::type> p;
408   p.fulfilTry(std::move(t));
409   return p.getFuture();
410 }
411
412 template <>
413 inline Future<void> makeFuture(Try<void>&& t) {
414   if (t.hasException()) {
415     return makeFuture<void>(std::move(t.exception()));
416   } else {
417     return makeFuture();
418   }
419 }
420
421 // via
422 template <typename Executor>
423 Future<void> via(Executor* executor) {
424   return makeFuture().via(executor);
425 }
426
427 // when (variadic)
428
429 template <typename... Fs>
430 typename detail::VariadicContext<
431   typename std::decay<Fs>::type::value_type...>::type
432 whenAll(Fs&&... fs)
433 {
434   auto ctx =
435     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
436   ctx->total = sizeof...(fs);
437   auto f_saved = ctx->p.getFuture();
438   detail::whenAllVariadicHelper(ctx,
439     std::forward<typename std::decay<Fs>::type>(fs)...);
440   return f_saved;
441 }
442
443 // when (iterator)
444
445 template <class InputIterator>
446 Future<
447   std::vector<
448   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
449 whenAll(InputIterator first, InputIterator last)
450 {
451   typedef
452     typename std::iterator_traits<InputIterator>::value_type::value_type T;
453
454   if (first >= last) {
455     return makeFuture(std::vector<Try<T>>());
456   }
457   size_t n = std::distance(first, last);
458
459   auto ctx = new detail::WhenAllContext<T>();
460
461   ctx->results.resize(n);
462
463   auto f_saved = ctx->p.getFuture();
464
465   for (size_t i = 0; first != last; ++first, ++i) {
466      assert(i < n);
467      auto& f = *first;
468      f.setCallback_([ctx, i, n](Try<T>&& t) {
469          ctx->results[i] = std::move(t);
470          if (++ctx->count == n) {
471            ctx->p.setValue(std::move(ctx->results));
472            delete ctx;
473          }
474        });
475   }
476
477   return f_saved;
478 }
479
480 template <class InputIterator>
481 Future<
482   std::pair<size_t,
483             Try<
484               typename
485               std::iterator_traits<InputIterator>::value_type::value_type> > >
486 whenAny(InputIterator first, InputIterator last) {
487   typedef
488     typename std::iterator_traits<InputIterator>::value_type::value_type T;
489
490   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
491   auto f_saved = ctx->p.getFuture();
492
493   for (size_t i = 0; first != last; first++, i++) {
494     auto& f = *first;
495     f.setCallback_([i, ctx](Try<T>&& t) {
496       if (!ctx->done.exchange(true)) {
497         ctx->p.setValue(std::make_pair(i, std::move(t)));
498       }
499       ctx->decref();
500     });
501   }
502
503   return f_saved;
504 }
505
506 template <class InputIterator>
507 Future<std::vector<std::pair<size_t, Try<typename
508   std::iterator_traits<InputIterator>::value_type::value_type>>>>
509 whenN(InputIterator first, InputIterator last, size_t n) {
510   typedef typename
511     std::iterator_traits<InputIterator>::value_type::value_type T;
512   typedef std::vector<std::pair<size_t, Try<T>>> V;
513
514   struct ctx_t {
515     V v;
516     size_t completed;
517     Promise<V> p;
518   };
519   auto ctx = std::make_shared<ctx_t>();
520   ctx->completed = 0;
521
522   // for each completed Future, increase count and add to vector, until we
523   // have n completed futures at which point we fulfil our Promise with the
524   // vector
525   auto it = first;
526   size_t i = 0;
527   while (it != last) {
528     it->then([ctx, n, i](Try<T>&& t) {
529       auto& v = ctx->v;
530       auto c = ++ctx->completed;
531       if (c <= n) {
532         assert(ctx->v.size() < n);
533         v.push_back(std::make_pair(i, std::move(t)));
534         if (c == n) {
535           ctx->p.fulfilTry(Try<V>(std::move(v)));
536         }
537       }
538     });
539
540     it++;
541     i++;
542   }
543
544   if (i < n) {
545     ctx->p.setException(std::runtime_error("Not enough futures"));
546   }
547
548   return ctx->p.getFuture();
549 }
550
551 namespace {
552   template <class T>
553   void getWaitHelper(Future<T>* f) {
554     // If we already have a value do the cheap thing
555     if (f->isReady()) {
556       return;
557     }
558
559     folly::Baton<> baton;
560     f->then([&](Try<T> const&) {
561       baton.post();
562     });
563     baton.wait();
564   }
565
566   template <class T>
567   Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
568     // TODO make and use variadic whenAny #5877971
569     Promise<T> p;
570     auto token = std::make_shared<std::atomic<bool>>();
571     folly::Baton<> baton;
572
573     folly::detail::getTimekeeperSingleton()->after(dur)
574       .then([&,token](Try<void> const& t) {
575         if (token->exchange(true) == false) {
576           if (t.hasException()) {
577             p.setException(std::move(t.exception()));
578           } else {
579             p.setException(TimedOut());
580           }
581           baton.post();
582         }
583       });
584
585     f->then([&, token](Try<T>&& t) {
586       if (token->exchange(true) == false) {
587         p.fulfilTry(std::move(t));
588         baton.post();
589       }
590     });
591
592     baton.wait();
593     return p.getFuture();
594   }
595 }
596
597 template <class T>
598 T Future<T>::get() {
599   getWaitHelper(this);
600
601   // Big assumption here: the then() call above, since it doesn't move out
602   // the value, leaves us with a value to return here. This would be a big
603   // no-no in user code, but I'm invoking internal developer privilege. This
604   // is slightly more efficient (save a move()) especially if there's an
605   // exception (save a throw).
606   return std::move(value());
607 }
608
609 template <>
610 inline void Future<void>::get() {
611   getWaitHelper(this);
612   value();
613 }
614
615 template <class T>
616 T Future<T>::get(Duration dur) {
617   return std::move(getWaitTimeoutHelper(this, dur).value());
618 }
619
620 template <>
621 inline void Future<void>::get(Duration dur) {
622   getWaitTimeoutHelper(this, dur).value();
623 }
624
625 template <class T>
626 T Future<T>::getVia(DrivableExecutor* e) {
627   while (!isReady()) {
628     e->drive();
629   }
630   return std::move(value());
631 }
632
633 template <>
634 inline void Future<void>::getVia(DrivableExecutor* e) {
635   while (!isReady()) {
636     e->drive();
637   }
638   value();
639 }
640
641 template <class T>
642 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
643   return within(dur, TimedOut(), tk);
644 }
645
646 template <class T>
647 template <class E>
648 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
649
650   struct Context {
651     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
652     E exception;
653     Promise<T> promise;
654     std::atomic<bool> token;
655   };
656   auto ctx = std::make_shared<Context>(std::move(e));
657
658   if (!tk) {
659     tk = folly::detail::getTimekeeperSingleton();
660   }
661
662   tk->after(dur)
663     .then([ctx](Try<void> const& t) {
664       if (ctx->token.exchange(true) == false) {
665         if (t.hasException()) {
666           ctx->promise.setException(std::move(t.exception()));
667         } else {
668           ctx->promise.setException(std::move(ctx->exception));
669         }
670       }
671     });
672
673   this->then([ctx](Try<T>&& t) {
674     if (ctx->token.exchange(true) == false) {
675       ctx->promise.fulfilTry(std::move(t));
676     }
677   });
678
679   return ctx->promise.getFuture();
680 }
681
682 template <class T>
683 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
684   return whenAll(*this, futures::sleep(dur, tk))
685     .then([](std::tuple<Try<T>, Try<void>> tup) {
686       Try<T>& t = std::get<0>(tup);
687       return makeFuture<T>(std::move(t));
688     });
689 }
690
691 template <class T>
692 Future<T> Future<T>::wait() {
693   Baton<> baton;
694   auto done = then([&](Try<T> t) {
695     baton.post();
696     return makeFuture(std::move(t));
697   });
698   baton.wait();
699   while (!done.isReady()) {
700     // There's a race here between the return here and the actual finishing of
701     // the future. f is completed, but the setup may not have finished on done
702     // after the baton has posted.
703     std::this_thread::yield();
704   }
705   return done;
706 }
707
708 template <class T>
709 Future<T> Future<T>::wait(Duration dur) {
710   auto baton = std::make_shared<Baton<>>();
711   auto done = then([baton](Try<T> t) {
712     baton->post();
713     return makeFuture(std::move(t));
714   });
715   // Let's preserve the invariant that if we did not timeout (timed_wait returns
716   // true), then the returned Future is complete when it is returned to the
717   // caller. We need to wait out the race for that Future to complete.
718   if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
719     while (!done.isReady()) {
720       std::this_thread::yield();
721     }
722   }
723   return done;
724 }
725
726 template <class T>
727 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
728   while (!isReady()) {
729     e->drive();
730   }
731   return *this;
732 }
733
734 template <class T>
735 Future<T> Future<T>::waitVia(DrivableExecutor* e) && {
736   while (!isReady()) {
737     e->drive();
738   }
739   return std::move(*this);
740 }
741
742 }
743
744 // I haven't included a Future<T&> specialization because I don't forsee us
745 // using it, however it is not difficult to add when needed. Refer to
746 // Future<void> for guidance. std::future and boost::future code would also be
747 // instructive.