(futures) Make executors sticky
[folly.git] / folly / futures / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/Baton.h>
23 #include <folly/futures/detail/Core.h>
24 #include <folly/futures/Timekeeper.h>
25
26 namespace folly {
27
28 class Timekeeper;
29
30 namespace detail {
31   Timekeeper* getTimekeeperSingleton();
32 }
33
34 template <class T>
35 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
36   *this = std::move(other);
37 }
38
39 template <class T>
40 Future<T>& Future<T>::operator=(Future<T>&& other) {
41   std::swap(core_, other.core_);
42   return *this;
43 }
44
45 template <class T>
46 template <class F>
47 Future<T>::Future(
48   const typename std::enable_if<!std::is_void<F>::value, F>::type& val)
49     : core_(nullptr) {
50   Promise<F> p;
51   p.setValue(val);
52   *this = p.getFuture();
53 }
54
55 template <class T>
56 template <class F>
57 Future<T>::Future(
58   typename std::enable_if<!std::is_void<F>::value, F>::type&& val)
59     : core_(nullptr) {
60   Promise<F> p;
61   p.setValue(std::forward<F>(val));
62   *this = p.getFuture();
63 }
64
65 template <>
66 template <class F,
67           typename std::enable_if<std::is_void<F>::value, int>::type>
68 Future<void>::Future() : core_(nullptr) {
69   Promise<void> p;
70   p.setValue();
71   *this = p.getFuture();
72 }
73
74
75 template <class T>
76 Future<T>::~Future() {
77   detach();
78 }
79
80 template <class T>
81 void Future<T>::detach() {
82   if (core_) {
83     core_->detachFuture();
84     core_ = nullptr;
85   }
86 }
87
88 template <class T>
89 void Future<T>::throwIfInvalid() const {
90   if (!core_)
91     throw NoState();
92 }
93
94 template <class T>
95 template <class F>
96 void Future<T>::setCallback_(F&& func) {
97   throwIfInvalid();
98   core_->setCallback(std::move(func));
99 }
100
101 // Variant: returns a value
102 // e.g. f.then([](Try<T>&& t){ return t.value(); });
103 template <class T>
104 template <typename F, typename R, bool isTry, typename... Args>
105 typename std::enable_if<!R::ReturnsFuture::value, typename R::Return>::type
106 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
107   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
108   typedef typename R::ReturnsFuture::Inner B;
109
110   throwIfInvalid();
111
112   // wrap these so we can move them into the lambda
113   folly::MoveWrapper<Promise<B>> p;
114   folly::MoveWrapper<F> funcm(std::forward<F>(func));
115
116   // grab the Future now before we lose our handle on the Promise
117   auto f = p->getFuture();
118   if (getExecutor()) {
119     f.setExecutor(getExecutor());
120   }
121
122   /* This is a bit tricky.
123
124      We can't just close over *this in case this Future gets moved. So we
125      make a new dummy Future. We could figure out something more
126      sophisticated that avoids making a new Future object when it can, as an
127      optimization. But this is correct.
128
129      core_ can't be moved, it is explicitly disallowed (as is copying). But
130      if there's ever a reason to allow it, this is one place that makes that
131      assumption and would need to be fixed. We use a standard shared pointer
132      for core_ (by copying it in), which means in essence obj holds a shared
133      pointer to itself.  But this shouldn't leak because Promise will not
134      outlive the continuation, because Promise will setException() with a
135      broken Promise if it is destructed before completed. We could use a
136      weak pointer but it would have to be converted to a shared pointer when
137      func is executed (because the Future returned by func may possibly
138      persist beyond the callback, if it gets moved), and so it is an
139      optimization to just make it shared from the get-go.
140
141      We have to move in the Promise and func using the MoveWrapper
142      hack. (func could be copied but it's a big drag on perf).
143
144      Two subtle but important points about this design. detail::Core has no
145      back pointers to Future or Promise, so if Future or Promise get moved
146      (and they will be moved in performant code) we don't have to do
147      anything fancy. And because we store the continuation in the
148      detail::Core, not in the Future, we can execute the continuation even
149      after the Future has gone out of scope. This is an intentional design
150      decision. It is likely we will want to be able to cancel a continuation
151      in some circumstances, but I think it should be explicit not implicit
152      in the destruction of the Future used to create it.
153      */
154   setCallback_(
155     [p, funcm](Try<T>&& t) mutable {
156       if (!isTry && t.hasException()) {
157         p->setException(std::move(t.exception()));
158       } else {
159         p->fulfil([&]() {
160           return (*funcm)(t.template get<isTry, Args>()...);
161         });
162       }
163     });
164
165   return f;
166 }
167
168 // Variant: returns a Future
169 // e.g. f.then([](T&& t){ return makeFuture<T>(t); });
170 template <class T>
171 template <typename F, typename R, bool isTry, typename... Args>
172 typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
173 Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
174   static_assert(sizeof...(Args) <= 1, "Then must take zero/one argument");
175   typedef typename R::ReturnsFuture::Inner B;
176
177   throwIfInvalid();
178
179   // wrap these so we can move them into the lambda
180   folly::MoveWrapper<Promise<B>> p;
181   folly::MoveWrapper<F> funcm(std::forward<F>(func));
182
183   // grab the Future now before we lose our handle on the Promise
184   auto f = p->getFuture();
185   if (getExecutor()) {
186     f.setExecutor(getExecutor());
187   }
188
189   setCallback_(
190     [p, funcm](Try<T>&& t) mutable {
191       if (!isTry && t.hasException()) {
192         p->setException(std::move(t.exception()));
193       } else {
194         try {
195           auto f2 = (*funcm)(t.template get<isTry, Args>()...);
196           // that didn't throw, now we can steal p
197           f2.setCallback_([p](Try<B>&& b) mutable {
198             p->fulfilTry(std::move(b));
199           });
200         } catch (const std::exception& e) {
201           p->setException(exception_wrapper(std::current_exception(), e));
202         } catch (...) {
203           p->setException(exception_wrapper(std::current_exception()));
204         }
205       }
206     });
207
208   return f;
209 }
210
211 template <typename T>
212 template <typename R, typename Caller, typename... Args>
213   Future<typename isFuture<R>::Inner>
214 Future<T>::then(R(Caller::*func)(Args...), Caller *instance) {
215   typedef typename std::remove_cv<
216     typename std::remove_reference<
217       typename detail::ArgType<Args...>::FirstArg>::type>::type FirstArg;
218   return then([instance, func](Try<T>&& t){
219     return (instance->*func)(t.template get<isTry<FirstArg>::value, Args>()...);
220   });
221 }
222
223 template <class T>
224 Future<void> Future<T>::then() {
225   return then([] (Try<T>&& t) {});
226 }
227
228 // onError where the callback returns T
229 template <class T>
230 template <class F>
231 typename std::enable_if<
232   !detail::Extract<F>::ReturnsFuture::value,
233   Future<T>>::type
234 Future<T>::onError(F&& func) {
235   typedef typename detail::Extract<F>::FirstArg Exn;
236   static_assert(
237       std::is_same<typename detail::Extract<F>::RawReturn, T>::value,
238       "Return type of onError callback must be T or Future<T>");
239
240   Promise<T> p;
241   auto f = p.getFuture();
242   auto pm = folly::makeMoveWrapper(std::move(p));
243   auto funcm = folly::makeMoveWrapper(std::move(func));
244   setCallback_([pm, funcm](Try<T>&& t) mutable {
245     if (!t.template withException<Exn>([&] (Exn& e) {
246           pm->fulfil([&]{
247             return (*funcm)(e);
248           });
249         })) {
250       pm->fulfilTry(std::move(t));
251     }
252   });
253
254   return f;
255 }
256
257 // onError where the callback returns Future<T>
258 template <class T>
259 template <class F>
260 typename std::enable_if<
261   detail::Extract<F>::ReturnsFuture::value,
262   Future<T>>::type
263 Future<T>::onError(F&& func) {
264   static_assert(
265       std::is_same<typename detail::Extract<F>::Return, Future<T>>::value,
266       "Return type of onError callback must be T or Future<T>");
267   typedef typename detail::Extract<F>::FirstArg Exn;
268
269   Promise<T> p;
270   auto f = p.getFuture();
271   auto pm = folly::makeMoveWrapper(std::move(p));
272   auto funcm = folly::makeMoveWrapper(std::move(func));
273   setCallback_([pm, funcm](Try<T>&& t) mutable {
274     if (!t.template withException<Exn>([&] (Exn& e) {
275           try {
276             auto f2 = (*funcm)(e);
277             f2.setCallback_([pm](Try<T>&& t2) mutable {
278               pm->fulfilTry(std::move(t2));
279             });
280           } catch (const std::exception& e2) {
281             pm->setException(exception_wrapper(std::current_exception(), e2));
282           } catch (...) {
283             pm->setException(exception_wrapper(std::current_exception()));
284           }
285         })) {
286       pm->fulfilTry(std::move(t));
287     }
288   });
289
290   return f;
291 }
292
293 template <class T>
294 template <class F>
295 Future<T> Future<T>::onTimeout(Duration dur, F&& func, Timekeeper* tk) {
296   auto funcw = folly::makeMoveWrapper(std::forward<F>(func));
297   return within(dur, tk)
298     .onError([funcw](TimedOut const&) { return (*funcw)(); });
299 }
300
301 template <class T>
302 typename std::add_lvalue_reference<T>::type Future<T>::value() {
303   throwIfInvalid();
304
305   return core_->getTry().value();
306 }
307
308 template <class T>
309 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
310   throwIfInvalid();
311
312   return core_->getTry().value();
313 }
314
315 template <class T>
316 Try<T>& Future<T>::getTry() {
317   throwIfInvalid();
318
319   return core_->getTry();
320 }
321
322 template <class T>
323 template <typename Executor>
324 inline Future<T> Future<T>::via(Executor* executor) && {
325   throwIfInvalid();
326
327   setExecutor(executor);
328
329   return std::move(*this);
330 }
331
332 template <class T>
333 template <typename Executor>
334 inline Future<T> Future<T>::via(Executor* executor) & {
335   throwIfInvalid();
336
337   MoveWrapper<Promise<T>> p;
338   auto f = p->getFuture();
339   then([p](Try<T>&& t) mutable { p->fulfilTry(std::move(t)); });
340   return std::move(f).via(executor);
341 }
342
343 template <class T>
344 bool Future<T>::isReady() const {
345   throwIfInvalid();
346   return core_->ready();
347 }
348
349 template <class T>
350 void Future<T>::raise(exception_wrapper exception) {
351   core_->raise(std::move(exception));
352 }
353
354 // makeFuture
355
356 template <class T>
357 Future<typename std::decay<T>::type> makeFuture(T&& t) {
358   Promise<typename std::decay<T>::type> p;
359   p.setValue(std::forward<T>(t));
360   return p.getFuture();
361 }
362
363 inline // for multiple translation units
364 Future<void> makeFuture() {
365   Promise<void> p;
366   p.setValue();
367   return p.getFuture();
368 }
369
370 template <class F>
371 auto makeFutureTry(
372     F&& func,
373     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
374     -> Future<decltype(func())> {
375   Promise<decltype(func())> p;
376   p.fulfil(
377     [&func]() {
378       return (func)();
379     });
380   return p.getFuture();
381 }
382
383 template <class F>
384 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
385   F copy = func;
386   return makeFutureTry(std::move(copy));
387 }
388
389 template <class T>
390 Future<T> makeFuture(std::exception_ptr const& e) {
391   Promise<T> p;
392   p.setException(e);
393   return p.getFuture();
394 }
395
396 template <class T>
397 Future<T> makeFuture(exception_wrapper ew) {
398   Promise<T> p;
399   p.setException(std::move(ew));
400   return p.getFuture();
401 }
402
403 template <class T, class E>
404 typename std::enable_if<std::is_base_of<std::exception, E>::value,
405                         Future<T>>::type
406 makeFuture(E const& e) {
407   Promise<T> p;
408   p.setException(make_exception_wrapper<E>(e));
409   return p.getFuture();
410 }
411
412 template <class T>
413 Future<T> makeFuture(Try<T>&& t) {
414   Promise<typename std::decay<T>::type> p;
415   p.fulfilTry(std::move(t));
416   return p.getFuture();
417 }
418
419 template <>
420 inline Future<void> makeFuture(Try<void>&& t) {
421   if (t.hasException()) {
422     return makeFuture<void>(std::move(t.exception()));
423   } else {
424     return makeFuture();
425   }
426 }
427
428 // via
429 template <typename Executor>
430 Future<void> via(Executor* executor) {
431   return makeFuture().via(executor);
432 }
433
434 // when (variadic)
435
436 template <typename... Fs>
437 typename detail::VariadicContext<
438   typename std::decay<Fs>::type::value_type...>::type
439 whenAll(Fs&&... fs)
440 {
441   auto ctx =
442     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
443   ctx->total = sizeof...(fs);
444   auto f_saved = ctx->p.getFuture();
445   detail::whenAllVariadicHelper(ctx,
446     std::forward<typename std::decay<Fs>::type>(fs)...);
447   return f_saved;
448 }
449
450 // when (iterator)
451
452 template <class InputIterator>
453 Future<
454   std::vector<
455   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
456 whenAll(InputIterator first, InputIterator last)
457 {
458   typedef
459     typename std::iterator_traits<InputIterator>::value_type::value_type T;
460
461   if (first >= last) {
462     return makeFuture(std::vector<Try<T>>());
463   }
464   size_t n = std::distance(first, last);
465
466   auto ctx = new detail::WhenAllContext<T>();
467
468   ctx->results.resize(n);
469
470   auto f_saved = ctx->p.getFuture();
471
472   for (size_t i = 0; first != last; ++first, ++i) {
473      assert(i < n);
474      auto& f = *first;
475      f.setCallback_([ctx, i, n](Try<T>&& t) {
476          ctx->results[i] = std::move(t);
477          if (++ctx->count == n) {
478            ctx->p.setValue(std::move(ctx->results));
479            delete ctx;
480          }
481        });
482   }
483
484   return f_saved;
485 }
486
487 template <class InputIterator>
488 Future<
489   std::pair<size_t,
490             Try<
491               typename
492               std::iterator_traits<InputIterator>::value_type::value_type> > >
493 whenAny(InputIterator first, InputIterator last) {
494   typedef
495     typename std::iterator_traits<InputIterator>::value_type::value_type T;
496
497   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
498   auto f_saved = ctx->p.getFuture();
499
500   for (size_t i = 0; first != last; first++, i++) {
501     auto& f = *first;
502     f.setCallback_([i, ctx](Try<T>&& t) {
503       if (!ctx->done.exchange(true)) {
504         ctx->p.setValue(std::make_pair(i, std::move(t)));
505       }
506       ctx->decref();
507     });
508   }
509
510   return f_saved;
511 }
512
513 template <class InputIterator>
514 Future<std::vector<std::pair<size_t, Try<typename
515   std::iterator_traits<InputIterator>::value_type::value_type>>>>
516 whenN(InputIterator first, InputIterator last, size_t n) {
517   typedef typename
518     std::iterator_traits<InputIterator>::value_type::value_type T;
519   typedef std::vector<std::pair<size_t, Try<T>>> V;
520
521   struct ctx_t {
522     V v;
523     size_t completed;
524     Promise<V> p;
525   };
526   auto ctx = std::make_shared<ctx_t>();
527   ctx->completed = 0;
528
529   // for each completed Future, increase count and add to vector, until we
530   // have n completed futures at which point we fulfil our Promise with the
531   // vector
532   auto it = first;
533   size_t i = 0;
534   while (it != last) {
535     it->then([ctx, n, i](Try<T>&& t) {
536       auto& v = ctx->v;
537       auto c = ++ctx->completed;
538       if (c <= n) {
539         assert(ctx->v.size() < n);
540         v.push_back(std::make_pair(i, std::move(t)));
541         if (c == n) {
542           ctx->p.fulfilTry(Try<V>(std::move(v)));
543         }
544       }
545     });
546
547     it++;
548     i++;
549   }
550
551   if (i < n) {
552     ctx->p.setException(std::runtime_error("Not enough futures"));
553   }
554
555   return ctx->p.getFuture();
556 }
557
558 namespace {
559   template <class T>
560   void getWaitHelper(Future<T>* f) {
561     // If we already have a value do the cheap thing
562     if (f->isReady()) {
563       return;
564     }
565
566     folly::Baton<> baton;
567     f->then([&](Try<T> const&) {
568       baton.post();
569     });
570     baton.wait();
571   }
572
573   template <class T>
574   Future<T> getWaitTimeoutHelper(Future<T>* f, Duration dur) {
575     // TODO make and use variadic whenAny #5877971
576     Promise<T> p;
577     auto token = std::make_shared<std::atomic<bool>>();
578     folly::Baton<> baton;
579
580     folly::detail::getTimekeeperSingleton()->after(dur)
581       .then([&,token](Try<void> const& t) {
582         if (token->exchange(true) == false) {
583           if (t.hasException()) {
584             p.setException(std::move(t.exception()));
585           } else {
586             p.setException(TimedOut());
587           }
588           baton.post();
589         }
590       });
591
592     f->then([&, token](Try<T>&& t) {
593       if (token->exchange(true) == false) {
594         p.fulfilTry(std::move(t));
595         baton.post();
596       }
597     });
598
599     baton.wait();
600     return p.getFuture();
601   }
602 }
603
604 template <class T>
605 T Future<T>::get() {
606   getWaitHelper(this);
607
608   // Big assumption here: the then() call above, since it doesn't move out
609   // the value, leaves us with a value to return here. This would be a big
610   // no-no in user code, but I'm invoking internal developer privilege. This
611   // is slightly more efficient (save a move()) especially if there's an
612   // exception (save a throw).
613   return std::move(value());
614 }
615
616 template <>
617 inline void Future<void>::get() {
618   getWaitHelper(this);
619   value();
620 }
621
622 template <class T>
623 T Future<T>::get(Duration dur) {
624   return std::move(getWaitTimeoutHelper(this, dur).value());
625 }
626
627 template <>
628 inline void Future<void>::get(Duration dur) {
629   getWaitTimeoutHelper(this, dur).value();
630 }
631
632 template <class T>
633 T Future<T>::getVia(DrivableExecutor* e) {
634   while (!isReady()) {
635     e->drive();
636   }
637   return std::move(value());
638 }
639
640 template <>
641 inline void Future<void>::getVia(DrivableExecutor* e) {
642   while (!isReady()) {
643     e->drive();
644   }
645   value();
646 }
647
648 template <class T>
649 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
650   return within(dur, TimedOut(), tk);
651 }
652
653 template <class T>
654 template <class E>
655 Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
656
657   struct Context {
658     Context(E ex) : exception(std::move(ex)), promise(), token(false) {}
659     E exception;
660     Promise<T> promise;
661     std::atomic<bool> token;
662   };
663   auto ctx = std::make_shared<Context>(std::move(e));
664
665   if (!tk) {
666     tk = folly::detail::getTimekeeperSingleton();
667   }
668
669   tk->after(dur)
670     .then([ctx](Try<void> const& t) {
671       if (ctx->token.exchange(true) == false) {
672         if (t.hasException()) {
673           ctx->promise.setException(std::move(t.exception()));
674         } else {
675           ctx->promise.setException(std::move(ctx->exception));
676         }
677       }
678     });
679
680   this->then([ctx](Try<T>&& t) {
681     if (ctx->token.exchange(true) == false) {
682       ctx->promise.fulfilTry(std::move(t));
683     }
684   });
685
686   return ctx->promise.getFuture();
687 }
688
689 template <class T>
690 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
691   return whenAll(*this, futures::sleep(dur, tk))
692     .then([](std::tuple<Try<T>, Try<void>> tup) {
693       Try<T>& t = std::get<0>(tup);
694       return makeFuture<T>(std::move(t));
695     });
696 }
697
698 namespace detail {
699
700 template <class T>
701 void waitImpl(Future<T>& f) {
702   Baton<> baton;
703   f = f.then([&](Try<T> t) {
704     baton.post();
705     return makeFuture(std::move(t));
706   });
707   baton.wait();
708   // There's a race here between the return here and the actual finishing of
709   // the future. f is completed, but the setup may not have finished on done
710   // after the baton has posted.
711   while (!f.isReady()) {
712     std::this_thread::yield();
713   }
714 }
715
716 template <class T>
717 void waitImpl(Future<T>& f, Duration dur) {
718   auto baton = std::make_shared<Baton<>>();
719   f = f.then([baton](Try<T> t) {
720     baton->post();
721     return makeFuture(std::move(t));
722   });
723   // Let's preserve the invariant that if we did not timeout (timed_wait returns
724   // true), then the returned Future is complete when it is returned to the
725   // caller. We need to wait out the race for that Future to complete.
726   if (baton->timed_wait(std::chrono::system_clock::now() + dur)) {
727     while (!f.isReady()) {
728       std::this_thread::yield();
729     }
730   }
731 }
732
733 template <class T>
734 void waitViaImpl(Future<T>& f, DrivableExecutor* e) {
735   while (!f.isReady()) {
736     e->drive();
737   }
738 }
739
740 } // detail
741
742 template <class T>
743 Future<T>& Future<T>::wait() & {
744   detail::waitImpl(*this);
745   return *this;
746 }
747
748 template <class T>
749 Future<T>&& Future<T>::wait() && {
750   detail::waitImpl(*this);
751   return std::move(*this);
752 }
753
754 template <class T>
755 Future<T>& Future<T>::wait(Duration dur) & {
756   detail::waitImpl(*this, dur);
757   return *this;
758 }
759
760 template <class T>
761 Future<T>&& Future<T>::wait(Duration dur) && {
762   detail::waitImpl(*this, dur);
763   return std::move(*this);
764 }
765
766 template <class T>
767 Future<T>& Future<T>::waitVia(DrivableExecutor* e) & {
768   detail::waitViaImpl(*this, e);
769   return *this;
770 }
771
772 template <class T>
773 Future<T>&& Future<T>::waitVia(DrivableExecutor* e) && {
774   detail::waitViaImpl(*this, e);
775   return std::move(*this);
776 }
777
778 namespace futures {
779
780   namespace {
781     template <class Z>
782     Future<Z> chainHelper(Future<Z> f) {
783       return f;
784     }
785
786     template <class Z, class F, class Fn, class... Callbacks>
787     Future<Z> chainHelper(F f, Fn fn, Callbacks... fns) {
788       return chainHelper<Z>(f.then(fn), fns...);
789     }
790   }
791
792   template <class A, class Z, class... Callbacks>
793   std::function<Future<Z>(Try<A>)>
794   chain(Callbacks... fns) {
795     MoveWrapper<Promise<A>> pw;
796     MoveWrapper<Future<Z>> fw(chainHelper<Z>(pw->getFuture(), fns...));
797     return [=](Try<A> t) mutable {
798       pw->fulfilTry(std::move(t));
799       return std::move(*fw);
800     };
801   }
802
803 }
804
805 } // namespace folly
806
807 // I haven't included a Future<T&> specialization because I don't forsee us
808 // using it, however it is not difficult to add when needed. Refer to
809 // Future<void> for guidance. std::future and boost::future code would also be
810 // instructive.